Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reply offload #1457

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,27 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
static int canAddNetworkBytesOut(int slot) {
return clusterSlotStatsEnabled() && slot != -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming it to canAddSlotStats and using it in both canAddCpuDuration and canAddNetworkBytesIn

Copy link
Author

@alexander-shabanov alexander-shabanov Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applied suggestion

}

/* Accumulates egress bytes for the slot. */
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out) {
if (!canAddNetworkBytesOut(slot)) return;

serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
server.cluster->slot_stats[slot].network_bytes_out += net_bytes_out;
}

/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
clusterSlotStatsAddNetworkBytesOutForSlot(c->slot, c->net_output_bytes_curr_cmd);
}

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;
if (c == NULL || !canAddNetworkBytesOut(c->slot)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
serverAssert(nodeIsPrimary(server.cluster->myself));
Expand All @@ -174,24 +179,14 @@ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand why was it required before this PR.

* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should not change as a side effect of this function,
* regardless of the function's early return condition. */
c->slot = _slot;
return;
}
if (!canAddNetworkBytesOut(slot)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
server.cluster->slot_stats[slot].network_bytes_out += c->net_output_bytes_curr_cmd;

/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}

/* Adds reply for the ORDERBY variant.
Expand Down Expand Up @@ -219,8 +214,7 @@ void clusterSlotStatResetAll(void) {
* would equate to repeating the same calculation twice.
*/
static int canAddCpuDuration(client *c) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled && /* Cluster mode should be enabled. */
return clusterSlotStatsEnabled() &&
c->slot != -1 && /* Command should be slot specific. */
(!server.execution_nesting || /* Either; */
(server.execution_nesting && /* 1) Command should not be nested, or */
Expand Down Expand Up @@ -248,7 +242,7 @@ static int canAddNetworkBytesIn(client *c) {
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
return clusterSlotStatsEnabled() && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
}

Expand Down Expand Up @@ -343,3 +337,7 @@ void clusterSlotStatsCommand(client *c) {
addReplySubcommandSyntaxError(c);
}
}

int clusterSlotStatsEnabled(void) {
return server.cluster_slot_stats_enabled && server.cluster_enabled;
}
2 changes: 2 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/* General use-cases. */
void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);
int clusterSlotStatsEnabled(void);

/* cpu-usec metric. */
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
Expand All @@ -17,6 +18,7 @@ void clusterSlotStatsSetClusterMsgLength(uint32_t len);
void clusterSlotStatsResetClusterMsgLength(void);

/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out);
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len);
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len);
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3206,6 +3206,7 @@ standardConfig static_configs[] = {
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("reply-offload", NULL, MODIFIABLE_CONFIG, server.reply_offload_enabled, 0, NULL, NULL),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess also why is the default off? IO threading is off by default, so it seems to allow this to be on by default.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please comments above regarding avoiding reply-offload config at all. The answer to the question 'why reply-offload is off by default' is because it does not benefit performance in 100% of use cases.


/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
6 changes: 5 additions & 1 deletion src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,13 @@ int trySendWriteToIOThreads(client *c) {
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
clientReplyBlock *block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block);
c->io_last_bufpos = block->used;
/* If reply offload enabled force new header */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should indeed check if reply offload is enabled before writing NULL to the header? Checking a global is cheaper than write access to the block

block->last_header = NULL;
alexander-shabanov marked this conversation as resolved.
Show resolved Hide resolved
} else {
c->io_last_bufpos = (size_t)c->bufpos;
c->last_header = NULL;
}
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);

Expand Down
3 changes: 3 additions & 0 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ static void prefetchEntry(KeyPrefetchInfo *info) {
if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) {
/* Not done yet */
moveToNextKey();
/* If reply offload enabled no need to prefetch value because main thread will not access it */
} else if (server.reply_offload_enabled) {
markKeyAsdone(info);
} else {
info->state = PREFETCH_VALUE;
}
Expand Down
Loading