Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload TLS negotiation to I/O threads #1338

Merged
merged 7 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef enum {

#define CONN_FLAG_CLOSE_SCHEDULED (1 << 0) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1 << 1) /* Write barrier requested */
#define CONN_FLAG_CLIENT (1 << 2) /* Connection is of a client - not a cluster link. */
uriyage marked this conversation as resolved.
Show resolved Hide resolved

#define CONN_TYPE_SOCKET "tcp"
#define CONN_TYPE_UNIX "unix"
Expand Down
54 changes: 54 additions & 0 deletions src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
static __thread int thread_id = 0; /* Thread local var */
static pthread_t io_threads[IO_THREADS_MAX_NUM] = {0};
static pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
void (*tls_negotiation_cb)(void *);
uriyage marked this conversation as resolved.
Show resolved Hide resolved

/* IO jobs queue functions - Used to send jobs from the main-thread to the IO thread. */
typedef void (*job_handler)(void *);
Expand Down Expand Up @@ -554,3 +555,56 @@ void trySendPollJobToIOThreads(void) {
aeSetPollProtect(server.el, 1);
IOJobQueue_push(jq, IOThreadPoll, server.el);
}

void setTLSNegotiationCallback(void (*cb)(void *)) {
tls_negotiation_cb = cb;
}

static void ioThreadTLSNegotiation(void *data) {
client *c = (client *)data;
tls_negotiation_cb(c->conn);
c->io_read_state = CLIENT_COMPLETED_IO;
}

/*
* This function attempts to offload TLS negotiation for a client connection to an I/O thread.
* Returns C_OK if the TLS negotiation was successfully queued for processing by an I/O thread,
* or C_ERR if the client is not eligible for offloading.
* Parameters:
* conn: The connection object for which TLS negotiation should be performed
*/
int trySendTLSNegotiationToIOThreads(connection *conn) {
if (server.io_threads_num <= 1) {
return C_ERR;
}

if (!(conn->flags & CONN_FLAG_CLIENT)) {
return C_ERR;
}

client *c = connGetPrivateData(conn);
if (c->io_read_state != CLIENT_IDLE) {
return C_OK;
}

if (server.active_io_threads_num <= 1) {
return C_ERR;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (server.io_threads_num <= 1) {
return C_ERR;
}
if (!(conn->flags & CONN_FLAG_CLIENT)) {
return C_ERR;
}
client *c = connGetPrivateData(conn);
if (c->io_read_state != CLIENT_IDLE) {
return C_OK;
}
if (server.active_io_threads_num <= 1) {
return C_ERR;
}
if (server.active_io_threads_num <= 1) {
return C_ERR;
}
if (!(conn->flags & CONN_FLAG_CLIENT)) {
return C_ERR;
}
client *c = connGetPrivateData(conn);
if (c->io_read_state != CLIENT_IDLE) {
return C_OK;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems everywhere else we just check for active threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a scenario where the main thread sends a job to the IO thread. After the IO thread completes the job, the main thread deactivates all threads. In this case, we want to ensure the main thread processes the returned job from the IO thread before performing an accept operation even if the io threads are not active.

To achieve this:

First, check if IO threads are enabled at all (less expensive check)
Then, verify the read_state is not idle
Finally, check the active state

Unlike read/write where we anyway check for the read/write states, the accept flow operates at the connection layer rather than the client layer, so we can't check the read state there.


size_t thread_id = (c->id % (server.active_io_threads_num - 1)) + 1;
IOJobQueue *job_queue = &io_jobs[thread_id];

if (IOJobQueue_isFull(job_queue)) {
return C_ERR;
}

c->read_flags = READ_FLAGS_TLS_NEGOTIATION;
c->io_read_state = CLIENT_PENDING_IO;
c->flag.pending_read = 1;
listLinkNodeTail(server.clients_pending_io_read, &c->pending_read_list_node);
connSetPostponeUpdateState(c->conn, 1);
server.stat_io_tls_negotiation_offloaded++;
IOJobQueue_push(job_queue, ioThreadTLSNegotiation, c);

return C_OK;
}
3 changes: 2 additions & 1 deletion src/io_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ int tryOffloadFreeArgvToIOThreads(client *c);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);
void trySendPollJobToIOThreads(void);

int trySendTLSNegotiationToIOThreads(connection *conn);
void setTLSNegotiationCallback(void (*cb)(void *));
#endif /* IO_THREADS_H */
4 changes: 4 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ client *createClient(connection *conn) {
if (server.tcpkeepalive) connKeepAlive(conn, server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
conn->flags |= CONN_FLAG_CLIENT;
}
c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size);
selectDb(c, 0);
Expand Down Expand Up @@ -4725,6 +4726,9 @@ int processIOThreadsReadDone(void) {
connSetPostponeUpdateState(c->conn, 0);
connUpdateState(c->conn);

/* No client's data was read only TLS handshake. */
if (c->read_flags & READ_FLAGS_TLS_NEGOTIATION) continue;

uriyage marked this conversation as resolved.
Show resolved Hide resolved
/* On read error - stop here. */
if (handleReadResult(c) == C_ERR) {
continue;
Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2604,6 +2604,7 @@ void resetServerStats(void) {
server.stat_total_reads_processed = 0;
server.stat_io_writes_processed = 0;
server.stat_io_freed_objects = 0;
server.stat_io_tls_negotiation_offloaded = 0;
server.stat_poll_processed_by_io_threads = 0;
server.stat_total_writes_processed = 0;
server.stat_client_qbuf_limit_disconnections = 0;
Expand Down Expand Up @@ -5862,6 +5863,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed,
"io_threaded_freed_objects:%lld\r\n", server.stat_io_freed_objects,
"io_threaded_tls_negotiations:%lld\r\n", server.stat_io_tls_negotiation_offloaded,
"io_threaded_poll_processed:%lld\r\n", server.stat_poll_processed_by_io_threads,
"io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches,
"io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries,
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,7 @@ struct valkeyServer {
long long stat_io_reads_processed; /* Number of read events processed by IO threads */
long long stat_io_writes_processed; /* Number of write events processed by IO threads */
long long stat_io_freed_objects; /* Number of objects freed by IO threads */
long long stat_io_tls_negotiation_offloaded; /* Number of TLS negotiation offloads */
long long stat_poll_processed_by_io_threads; /* Total number of poll jobs processed by IO */
long long stat_total_reads_processed; /* Total number of read events processed */
long long stat_total_writes_processed; /* Total number of write events processed */
Expand Down Expand Up @@ -2767,6 +2768,7 @@ void dictVanillaFree(void *val);
#define READ_FLAGS_PRIMARY (1 << 14)
#define READ_FLAGS_DONT_PARSE (1 << 15)
#define READ_FLAGS_AUTH_REQUIRED (1 << 16)
#define READ_FLAGS_TLS_NEGOTIATION (1 << 17)
uriyage marked this conversation as resolved.
Show resolved Hide resolved

/* Write flags for various write errors and states */
#define WRITE_FLAGS_WRITE_ERROR (1 << 0)
Expand Down
Loading
Loading