0%

Redis-client

Redis 是怎么接受客户端请求并且进行响应的呢?并且Redis是怎么维护客户端链接的呢?

Client

Redis 基于TCP + 自定义协议实现通信和请求,因为tcp是流式的,所以对于每一个client会先将网络数据保存在querybuf当中,攒够了再处理,另外写出数据放在buf当中

Client数据结构比较复杂,有一些字段还和slave和replation有关,这里暂不详细看

一般情况下,使用TCP作为通信协议,在应用层还需要增加一层协议用于区分请求,比如定长包,固定分割符,header+content等形式,在Redis里面用了Redis serialization protocol (RESP) specification

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
uint64_t flags; /* Client flags: CLIENT_* macros. */
connection *conn;
int resp; /* RESP protocol version. Can be 2 or 3. */
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
int argv_len; /* Size of argv array (may be more than argc) */
int original_argc; /* Num of arguments of original command if arguments were rewritten. */
robj **original_argv; /* Arguments of original command if arguments were rewritten. */
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
struct redisCommand *realcmd; /* The original command that was executed by the client,
Used to update error stats in case the c->cmd was modified
during the command invocation (like on GEOADD for example). */
user *user; /* User associated with this connection. If the
user is set to NULL the connection can do
anything (admin). */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
list *deferred_reply_errors; /* Used for module thread safe contexts. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
int slot; /* The slot the client is executing against. Set to -1 if no slot is being used */
dictEntry *cur_script; /* Cached pointer to the dictEntry of the script being executed. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char *slave_addr; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
int slave_req; /* Slave requirements: SLAVE_REQ_* */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
listNode *postponed_list_node; /* list node within the postponed list */
listNode *pending_read_list_node; /* list node in clients pending read list */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
void *auth_callback_privdata; /* Private data that is passed when the auth
* changed callback is executed. Opaque for
* Redis Core. */
void *auth_module; /* The module that owns the callback, which is used
* to disconnect the client if the module is
* unloaded for cleanup. Opaque for Redis Core.*/

/* If this client is in tracking mode and this field is non zero,
* invalidation messages for keys fetched by this client will be send to
* the specified client ID. */
uint64_t client_tracking_redirection;
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
/* In updateClientMemoryUsage() we track the memory usage of
* each client and add it to the sum of all the clients of a given type,
* however we need to remember what was the old contribution of each
* client, and in which category the client was, in order to remove it
* before adding it the new value. */
size_t last_memory_usage;
int last_memory_type;

listNode *mem_usage_bucket_node;
clientMemUsageBucket *mem_usage_bucket;

listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
* see the definition of replBufBlock. */
size_t ref_block_pos; /* Access position of referenced buffer block,
* i.e. the next offset to send. */

/* Response buffer */
size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */
mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */
int bufpos;
size_t buf_usable_size; /* Usable size of buffer. */
char *buf;
} client

initServer 初始化

针对每个监听socket fd,注册读事件,处理器为acceptTcpHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Create an event handler for accepting new connections in TCP or TLS domain sockets.
* This works atomically for all socket fds */
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
int j;

for (j = 0; j < sfd->count; j++) {
if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler, NULL) == AE_ERR) {
/* Rollback */
for (j = j - 1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
return C_ERR;
}
}
return C_OK;
}

acceptTcpHandler

对于server端的fd读事件处理,每次最多处理MAX_ACCEPTS_PER_CALL个请求,默认值是1000
处理过程是通过anetTcpAccept得到客户端fd(cfd),调用connCreateAcceptedSocket封装connection
然后调用 acceptCommonHandler 创建client对象~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);

while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}

创建client对象的过程中会调用connSetReadHandler,注册对该client fd的读事件监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));

/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (conn) {
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES);
selectDb(c,0);
uint64_t client_id;
atomicGetIncr(server.next_client_id, client_id, 1);
c->id = client_id;
c->resp = 2;
c->conn = conn;
c->name = NULL;
c->bufpos = 0;
c->buf_usable_size = zmalloc_usable_size(c->buf);
c->buf_peak = c->buf_usable_size;
c->buf_peak_last_reset_time = server.unixtime;
c->ref_repl_buf_node = NULL;
c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
c->argv = NULL;
c->argv_len = 0;
c->argv_len_sum = 0;
c->original_argc = 0;
c->original_argv = NULL;
c->cmd = c->lastcmd = c->realcmd = NULL;
c->cur_script = NULL;
c->multibulklen = 0;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
c->slot = -1;
c->ctime = c->lastinteraction = server.unixtime;
clientSetDefaultAuth(c);
c->replstate = REPL_STATE_NONE;
c->repl_start_cmd_stream_on_ack = 0;
c->reploff = 0;
c->read_reploff = 0;
c->repl_applied = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->repl_last_partial_write = 0;
c->slave_listening_port = 0;
c->slave_addr = NULL;
c->slave_capa = SLAVE_CAPA_NONE;
c->slave_req = SLAVE_REQ_NONE;
c->reply = listCreate();
c->deferred_reply_errors = NULL;
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
c->btype = BLOCKED_NONE;
c->bpop.timeout = 0;
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType);
c->bpop.target = NULL;
c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
c->bpop.xread_group_noack = 0;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
c->woff = 0;
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
c->pubsub_patterns = listCreate();
c->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType);
c->peerid = NULL;
c->sockname = NULL;
c->client_list_node = NULL;
c->postponed_list_node = NULL;
c->pending_read_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
c->last_memory_usage = 0;
c->last_memory_type = CLIENT_TYPE_NORMAL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
c->mem_usage_bucket = NULL;
c->mem_usage_bucket_node = NULL;
if (conn) linkClient(c);
initClientMultiState(c);
return c;
}

实际注册监听读事件在 connSocketSetReadHandler,回调函数绑定ae_handler,对应的实现是aconnSocketEventHandle,是通用响应器

该处理器会判断具体的读写事件分发调用 read_handler write_handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* Register a read handler, to be called when the connection is readable.
* If NULL, the existing handler is removed.
*/
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;

conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}

readHandler - readQueryFromClient

该函数是Redis中用于读取客户端发来的请求并进行处理的函数,函数的主要流程如下:

  1. 判断是否需要推迟从客户端读取数据的操作,如果该客户端的请求是多条bulk的请求,并且正在处理的bulk请求过大时,采用非贪心方式扩展querybuf空间以避免复制缓冲区提高性能。

  2. 因为queryBuf是需要动态扩展的,所以当querybuf长度不够时需要对querybuf进行空间扩展,如果该客户端是一个master客户端,querybuf的扩展可采用贪心方式;否则采用非贪心方式,避免与RESIZE_THRESHOLD机制产生干扰。所谓贪心与非贪心则是,非贪心仅扩展到刚好的内存,贪心则是类似扩充到目前内存的两倍,可能会有一定额外的内存

  3. 从socket中读取请求数据并添加到querybuf缓冲区中如果querybuf缓冲区已满,则关闭该客户端连接并释放该客户端所占用的资源。

  4. 维护相关计数器并更新客户端的最后操作时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, big_arg = 0;
size_t qblen, readlen;

/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;

/* Update total number of reads on server */
atomicIncr(server.stat_total_reads_processed, 1);

readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
big_arg = 1;

/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
if (remaining > 0) readlen = remaining;

/* Master client needs expand the readlen when meet BIG_ARG(see #9100),
* but doesn't need align to the next arg, we can read more data. */
if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
readlen = PROTO_IOBUF_LEN;
}

qblen = sdslen(c->querybuf);
if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy.
(big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
/* When reading a BIG_ARG we won't be reading more than that one arg
* into the query buffer, so we don't need to pre-allocate more than we
* need, so using the non-greedy growing. For an initial allocation of
* the query buffer, we also don't wanna use the greedy growth, in order
* to avoid collision with the RESIZE_THRESHOLD mechanism. */
c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen);
} else {
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);

/* Read as much as possible from the socket to save read(2) system calls. */
readlen = sdsavail(c->querybuf);
}
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
goto done;
}
} else if (nread == 0) {
if (server.verbosity <= LL_VERBOSE) {
sds info = catClientInfoString(sdsempty(), c);
serverLog(LL_VERBOSE, "Client closed connection %s", info);
sdsfree(info);
}
freeClientAsync(c);
goto done;
}

sdsIncrLen(c->querybuf,nread);
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;

c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) {
c->read_reploff += nread;
atomicIncr(server.stat_net_repl_input_bytes, nread);
} else {
atomicIncr(server.stat_net_input_bytes, nread);
}

if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
goto done;
}

/* There is more data in the client input buffer, continue parsing it
* and check if there is a full command to execute. */
if (processInputBuffer(c) == C_ERR)
c = NULL;

done:
beforeNextClient(c);
}

processInputBuffer

redis从网络中读到数据后,就会按照RESP协议得到当前客户端的命令和值,解析详情请看RESP协议解读

以set key value 命令举例,经过resp协议编码将会得到

通过协议解析后,调用processCommandAndResetClient执行实际命令,先从根据 c->argv,c->argc,从server.command中找到对应的命令。

server.command是一个redis dict结构,保存着命令相关的执行方法,复杂度,提示信息等各项内容,如图所示

digraph g { fontname = "Helvetica,Arial,sans-serif"; node [fontname = "Helvetica,Arial,sans-serif";]; edge [fontname = "Helvetica,Arial,sans-serif";]; graph [rankdir = "LR";]; node [shape = "ellipse";];
"redisServer" [label = "redis.server|commands|....";shape = "record";];

"commands" [label = " <f1> hset|<f2> hget |<f3> hdel |<f4> otherCommand";shape = "record";];

"hset" [label = "name:hset | summary:GSet the string value of a hash field| .proc : hsetCommand | ...";shape = "record";];

"hget" [label = "name:hset summary:Get the value of a hash field | .proc: hgetComand | ...";shape = "record";];
"otherCommand" [label = "...";shape = "record";];
"redisServer" -> "commands";
"commands":f1 -> "hset";
"commands":f2 -> "hget";
"commands":f4 -> otherCommand;

}

在dict到找到相关命令后,然后执行对应命令的函数

执行前还有不少其他逻辑,比如

  1. 校验参数个数是否正确
  2. 校验调用者权限
  3. 判断是否开启了cluster配置,发起调用转向
  4. 最大内存使用量判断
  5. ….
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct redisCommand *lookupCommand(robj **argv, int argc) {
return lookupCommandLogic(server.commands, argv, argc, 0);
}

struct redisCommand *lookupCommandLogic(dict *commands, robj **argv, int argc, int strict) {
struct redisCommand *base_cmd = dictFetchValue(commands, argv[0]->ptr);
int has_subcommands = base_cmd && base_cmd->subcommands_dict;
if (argc == 1 || !has_subcommands) {
if (strict && argc != 1)
return NULL;
/* Note: It is possible that base_cmd->proc==NULL (e.g. CONFIG) */
return base_cmd;
} else { /* argc > 1 && has_subcommands */
if (strict && argc != 2)
return NULL;
/* Note: Currently we support just one level of subcommands */
return lookupSubcommand(base_cmd, argv[1]->ptr);
}
}

执行对应的command