typedefstructclient { uint64_t id; /* Client incremental unique ID. */ uint64_t flags; /* Client flags: CLIENT_* macros. */ connection *conn; int resp; /* RESP protocol version. Can be 2 or 3. */ redisDb *db; /* Pointer to currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ int argc; /* Num of arguments of current command. */ robj **argv; /* Arguments of current command. */ int argv_len; /* Size of argv array (may be more than argc) */ int original_argc; /* Num of arguments of original command if arguments were rewritten. */ robj **original_argv; /* Arguments of original command if arguments were rewritten. */ size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ structredisCommand *cmd, *lastcmd; /* Last command executed. */ structredisCommand *realcmd; /* The original command that was executed by the client, Used to update error stats in case the c->cmd was modified during the command invocation (like on GEOADD for example). */ user *user; /* User associated with this connection. If the user is set to NULL the connection can do anything (admin). */ int reqtype; /* Request protocol type: PROTO_REQ_* */ int multibulklen; /* Number of multi bulk arguments left to read. */ long bulklen; /* Length of bulk argument in multi bulk request. */ list *reply; /* List of reply objects to send to the client. */ unsignedlonglong reply_bytes; /* Tot bytes of objects in reply list. */ list *deferred_reply_errors; /* Used for module thread safe contexts. */ size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ time_t ctime; /* Client creation time. */ long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */ int slot; /* The slot the client is executing against. Set to -1 if no slot is being used */ dictEntry *cur_script; /* Cached pointer to the dictEntry of the script being executed. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */ int repldbfd; /* Replication DB file descriptor. */ off_t repldboff; /* Replication DB file offset. */ off_t repldbsize; /* Replication DB file size. */ sds replpreamble; /* Replication DB preamble. */ longlong read_reploff; /* Read replication offset if this is a master. */ longlong reploff; /* Applied replication offset if this is a master. */ longlong repl_applied; /* Applied replication data count in querybuf, if this is a replica. */ longlong repl_ack_off; /* Replication ack offset, if this is a slave. */ longlong repl_ack_time;/* Replication ack time, if this is a slave. */ longlong repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */ longlong psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: REPLCONF listening-port */ char *slave_addr; /* Optionally given by REPLCONF ip-address */ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ int slave_req; /* Slave requirements: SLAVE_REQ_* */ multiState mstate; /* MULTI/EXEC state */ int btype; /* Type of blocking op if CLIENT_BLOCKED. */ blockingState bpop; /* blocking state */ longlong woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */ sds peerid; /* Cached peer ID. */ sds sockname; /* Cached connection target address. */ listNode *client_list_node; /* list node in client list */ listNode *postponed_list_node; /* list node within the postponed list */ listNode *pending_read_list_node; /* list node in clients pending read list */ RedisModuleUserChangedFunc auth_callback; /* Module callback to execute * when the authenticated user * changes. */ void *auth_callback_privdata; /* Private data that is passed when the auth * changed callback is executed. Opaque for * Redis Core. */ void *auth_module; /* The module that owns the callback, which is used * to disconnect the client if the module is * unloaded for cleanup. Opaque for Redis Core.*/
/* If this client is in tracking mode and this field is non zero, * invalidation messages for keys fetched by this client will be send to * the specified client ID. */ uint64_t client_tracking_redirection; rax *client_tracking_prefixes; /* A dictionary of prefixes we are already subscribed to in BCAST mode, in the context of client side caching. */ /* In updateClientMemoryUsage() we track the memory usage of * each client and add it to the sum of all the clients of a given type, * however we need to remember what was the old contribution of each * client, and in which category the client was, in order to remove it * before adding it the new value. */ size_t last_memory_usage; int last_memory_type;
listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks, * see the definition of replBufBlock. */ size_t ref_block_pos; /* Access position of referenced buffer block, * i.e. the next offset to send. */
/* Response buffer */ size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */ mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */ int bufpos; size_t buf_usable_size; /* Usable size of buffer. */ char *buf; } client
initServer 初始化
针对每个监听socket fd,注册读事件,处理器为acceptTcpHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/* Create an event handler for accepting new connections in TCP or TLS domain sockets. * This works atomically for all socket fds */ intcreateSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) { int j;
voidacceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata);
/* Register a read handler, to be called when the connection is readable. * If NULL, the existing handler is removed. */ staticintconnSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) { if (func == conn->read_handler) return C_OK;
conn->read_handler = func; if (!conn->read_handler) aeDeleteFileEvent(server.el,conn->fd,AE_READABLE); else if (aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; return C_OK; }
/* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ if (postponeClientRead(c)) return;
/* Update total number of reads on server */ atomicIncr(server.stat_total_reads_processed, 1);
readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); big_arg = 1;
/* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0) readlen = remaining;
/* Master client needs expand the readlen when meet BIG_ARG(see #9100), * but doesn't need align to the next arg, we can read more data. */ if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) readlen = PROTO_IOBUF_LEN; }
qblen = sdslen(c->querybuf); if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy. (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) { /* When reading a BIG_ARG we won't be reading more than that one arg * into the query buffer, so we don't need to pre-allocate more than we * need, so using the non-greedy growing. For an initial allocation of * the query buffer, we also don't wanna use the greedy growth, in order * to avoid collision with the RESIZE_THRESHOLD mechanism. */ c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen); } else { c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
/* Read as much as possible from the socket to save read(2) system calls. */ readlen = sdsavail(c->querybuf); } nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); goto done; } } elseif (nread == 0) { if (server.verbosity <= LL_VERBOSE) { sds info = catClientInfoString(sdsempty(), c); serverLog(LL_VERBOSE, "Client closed connection %s", info); sdsfree(info); } freeClientAsync(c); goto done; }
/* There is more data in the client input buffer, continue parsing it * and check if there is a full command to execute. */ if (processInputBuffer(c) == C_ERR) c = NULL;