List the main points to facilitate the understanding of the following program.
Redis monitors incoming connections on TCP port 6379 (default port, which can be modified in the configuration). Every Redis command or data transmitted between the client and the server ends with rn.
Redis replies to commands with different reply types. It may check the reply type starting from the first byte sent by the server:
* With a single line reply (status reply), the first byte of the reply will be “ ”
* Error message, the first byte of the reply will be “-”
* Integer number, the first byte of the reply will be “:”
* Batch reply, the first byte of the reply will be “$”
* Multiple batch reply, the first byte of the reply will be “*”
Bulk replies are used by servers to return a single binary secure string.
C: GET mykey
S: $6rnfoobarrn
The server sends the first line of reply, which starts with "$" followed by the actual bytes to be sent. number, followed by the CRLF, then the actual data is sent, followed by 2 bytes of extra data for the final CRLF. The exact sequence sent by the server is as follows: "$6rnfoobarrn"
If the requested value does not exist, the batch reply will use the special value -1 as the data length, for example:
C: GET nonexistingkey
S: $-1
When the requested object does not exist, the client library API will not return an empty string, but an empty object. For example: the Ruby library returns "nil", while the C library returns NULL (or sets the specified flag in the reply object), etc.
Binary
redis cluster
Data sharding of Redis cluster
Redis cluster has 16384 hash slots , each key is checked modulo 16384 after passing the CRC16 check to determine which slot to place it. Each node in the cluster is responsible for a part of the hash slot. For example, if the current cluster has 3 nodes, then:
* Node A contains hash slots 0 to 5500.
* Node B contains hash slots 5501 to 11000.
* Node C contains hash slots 11001 to 16384.
This structure makes it easy to add or delete nodes. For example, if I want to add a new node D, I need to get some slots from nodes A, B, and C to D. If I want to remove node A, I need to The slots in A are moved to nodes B and C, and then the A node without any slots is removed from the cluster. Since moving the hash slot from one node to another does not stop the service, no matter what is added Deleting or changing the number of hash slots of a node will not cause the cluster to become unavailable.
The client and server side in the Redis cluster protocol
To perform these tasks, all cluster nodes communicate through a TCP connection (TCP bus?) and a binary protocol (cluster connection, cluster bus). Each node is connected to every other node in the cluster through a cluster bus. Nodes use a gossip protocol to spread cluster information, which can: discover new nodes, send ping packets (used to ensure that all nodes are working properly), and send cluster messages when certain situations occur. Cluster connections are also used to publish or subscribe to messages in the cluster.
Since cluster nodes cannot proxy requests, the client will redirect the command to other nodes when receiving redirections errors -MOVED and -ASK. Theoretically, the client is free to send requests to all nodes in the cluster and redirect requests to other nodes when needed, so the client does not need to save the cluster state. However, the client can cache the mapping relationship between key values and nodes, which can significantly improve the efficiency of command execution.
-MOVED
No need to consider the situation of -ASK.
The code is as follows:
#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <errno.h>#include <fcntl.h>#include <netdb.h>#include <sys/poll.h>#include <unistd.h>#include <sys/types.h>#include <stdlib.h>#include <stdio.h>ssize_t sock_write_loop( int fd, const void *vptr, size_t n ) { size_t nleft = 0; ssize_t nwritten = 0;const char *ptr; ptr = (char *) vptr; nleft = n;while( nleft > 0 ) {if( (nwritten = write(fd, ptr, nleft) ) <= 0 ) {if( errno == EINTR ) { nwritten = 0; //再次调用write }else{return -5; } } nleft = nleft - nwritten; ptr = ptr + nwritten; }return(n); }int sock_read_wait( int fd, int timeout ) {struct pollfd pfd; pfd.fd = fd; pfd.events = POLLIN; pfd.revents = 0; timeout *= 1000;for (;;) {switch( poll(&pfd, 1, timeout) ) {case -1:if( errno != EINTR ) {return (-2); }continue;case 0: errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLIN )return (0);elsereturn (-3); } } } ssize_t sock_read_tmo( int fd, void *vptr, size_t len, int timeout ) { if( timeout > 0 && sock_read_wait(fd, timeout) < 0 )return (-1);elsereturn (read(fd, vptr, len)); }int sock_connect_nore(const char *IPaddr , int port , int timeout) { // char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL ) {return -1; }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 ) {return -1; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 ) {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0; memset(&host, 0, sizeof(host)); memset(sBuf, 0, sizeof(sBuf)); memset(sHostIp, 0 , sizeof(sHostIp)); pHost = &host; #ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || #else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || #endif(pHost == NULL) ) { close(sock_fd);return -1; }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 ) { close(sock_fd);return -1; }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL ) { close(sock_fd);return -1; } if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 ) { close(sock_fd); return -1; }//end added by navy 2003.3.31 }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 ) { close(sock_fd); return -1; }return sock_fd; }int sock_connect(const char *IPaddr , int port , int timeout) {char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL ) {return -1; }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 ) {return -1; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 ) {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0; memset(&host, 0, sizeof(host)); memset(sBuf, 0, sizeof(sBuf)); memset(sHostIp, 0 , sizeof(sHostIp)); pHost = &host; #ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || #else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || #endif(pHost == NULL) ) { close(sock_fd);return -1; }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 ) { close(sock_fd);return -1; }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL ) { close(sock_fd);return -1; } if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 ) { close(sock_fd); return -1; }//end added by navy 2003.3.31 }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 ) { close(sock_fd); return -1; } n = sock_read_tmo(sock_fd, temp, 4096, timeout);//一般错误if( n <= 0 ) { close(sock_fd); sock_fd = -1; }return sock_fd; }int sock_non_blocking(int fd, int on) {int flags;if ((flags = fcntl(fd, F_GETFL, 0)) < 0){return -10; }if (fcntl(fd, F_SETFL, on ? flags | O_NONBLOCK : flags & ~O_NONBLOCK) < 0){return -10; }return 0; }int sock_write_wait(int fd, int timeout) {struct pollfd pfd; pfd.fd = fd; pfd.events = POLLOUT; pfd.revents = 0; timeout *= 1000;for (;;) {switch( poll(&pfd, 1, timeout) ) {case -1:if( errno != EINTR ) {return (-2); }continue;case 0: errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLOUT )return (0);elsereturn (-3); } } }int sock_timed_connect(int sock, struct sockaddr * sa, int len, int timeout) {int error = 0; socklen_t error_len; sock_non_blocking(sock, 1);if( connect(sock, sa, len) == 0 ) { sock_non_blocking(sock, 0);return (0); }if( errno != EINPROGRESS ) { sock_non_blocking(sock, 0);return (-1); }/* * A connection is in progress. Wait for a limited amount of time for * something to happen. If nothing happens, report an error. */if( sock_write_wait(sock, timeout) != 0) { sock_non_blocking(sock, 0);return (-2); }/* * Something happened. Some Solaris 2 versions have getsockopt() itself * return the error, instead of returning it via the parameter list. */error = 0; error_len = sizeof(error);if( getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) != 0 ) { sock_non_blocking(sock, 0);return (-3); }if( error ) { errno = error; sock_non_blocking(sock, 0);return (-4); } sock_non_blocking(sock, 0);/* * No problems. */return (0); }static int check_ip_in_list(const char *ip, char *iplist) { char *token = NULL;char *saveptr = NULL; token = strtok_r(iplist, ",", &saveptr);while(token != NULL) { char *ptmp = NULL; char *ip_mask = strtok_r(token, "/", &ptmp);if(!ip_mask) return -1; char *ip_bit = strtok_r(NULL, "/", &ptmp); if(ip_bit) {int mask_bit = atoi(ip_bit);if(mask_bit < 0 || mask_bit >32)continue; unsigned long addr[4] = { 0 }; sscanf( ip_mask, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 ); unsigned long vl1 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3]; sscanf( ip, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 ); unsigned long vl2 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3]; vl1 = ( vl1 >> ( 32 - mask_bit ) ); vl2 = ( vl2 >> ( 32 - mask_bit ) );if( vl1 == vl2 ) return 1; }else{if(strcmp(ip,ip_mask) == 0) return 1; } token = strtok_r(NULL, ",", &saveptr); } return 0; }static int check_ip_in_redis(const char *redis_host, const char *ip,const char *rq_pro) {char buf[128];int loops = 0; strcpy(buf, redis_host); do{ loops ++;char *ptmp = NULL;char *host = strtok_r(buf, ":", &ptmp);if(!host) return -1;char *s_port = strtok_r(NULL, ":", &ptmp);if(!s_port) return -1;int port = atoi(s_port);char respone[40] = {0};int sock_fd = -1;if((sock_fd = sock_connect_nore(host, port, 5))<0)return -1;if(sock_write_loop(sock_fd, rq_pro, strlen(rq_pro)) != strlen(rq_pro)) { close(sock_fd);return -1; }if(sock_read_tmo(sock_fd, respone, sizeof(respone)-1, 5)<=0) { close(sock_fd);return -1; } if(strncmp(":0", respone, 2) == 0) { close(sock_fd);return 0; } else if(strncmp(":1", respone, 2) == 0) { close(sock_fd);return 1; } else if(strncmp("$", respone, 1) == 0) { int data_size = 0; int ret = 0;char *data_line = strstr(respone,"rn");if(!data_line) { close(sock_fd);return -1; } data_line = data_line+2; data_size = atoi(respone+1);if(data_size == -1) { close(sock_fd);return 0; }if(strlen(data_line) == data_size+2) { printf("line = %d, data_line = %sn",__LINE__,data_line); ret=check_ip_in_list(ip, data_line); close(sock_fd);return ret; }char *data = calloc(data_size+3,1);if(!data) { close(sock_fd);return -1; } strcpy(data,data_line);int read_size = strlen(data);int left_size = data_size + 2 - read_size;while(left_size > 0) {int nread = sock_read_tmo(sock_fd, data+read_size, left_size, 5);if(nread<=0) {free(data); close(sock_fd); return -1; } read_size += nread; left_size -= nread; } close(sock_fd); printf("line = %d, data = %sn",__LINE__,data); ret=check_ip_in_list(ip, data);free(data);return ret; } else if(strncmp("-MOVED", respone, 6) == 0) { close(sock_fd);char *p = strchr(respone, ' ');if(p == NULL)return -1; p = strchr(p+1, ' ');if(p == NULL)return -1; strcpy(buf, p+1); }else{ close(sock_fd);return -1; } }while(loops < 2);return -1; }int main(int argc,char *argv[]) {if(argc != 2) { printf("please input ipn");return -1; } const char *redis_ip = "127.0.0.1:7002";const char *domain = "test.com";char exist_pro[128] = {0};char get_pro[128] = {0}; snprintf(exist_pro,sizeof(exist_pro),"EXISTS test|%s|%srn",domain,"127.0.0.1"); snprintf(get_pro,sizeof(get_pro),"GET test_%srn",domain);int loops = 0;int ret = 0;do{ loops ++; ret = check_ip_in_redis(redis_ip, argv[1],exist_pro);if(ret == 0) ret = check_ip_in_redis(redis_ip, argv[1],get_pro); }while(loops < 3 && ret < 0); printf("line = %d, ret = %dn",__LINE__,ret);return ret; }
c_redis_cli.c
Mainly look at the check_ip_in_redis function, the others are all sockets Encapsulation.
#!/usr/bin/pythonimport sys import socketdef main(argv):if(len(argv) != 3):print "please input domain ip!"returnhost = "192.168.188.47" port = 7002while 1: s = socket.socket() s.connect((host, port)) cmd = 'set %s_white_ip %srn' % (argv[1],argv[2]) s.send(cmd) res = s.recv(32) s.close() if res[0] == "+":print "set domain white ip suc!"return elif res[0:6] == "-MOVED": list = res.split(" ") ip_list = list[2].split(":") host = ip_list[0] port = int(ip_list[1]) else:print "set domain white ip error!"return if __name__ == "__main__": main(sys.argv)
The above is the detailed content of How to quickly build a redis cluster using python. For more information, please follow other related articles on the PHP Chinese website!