How to quickly build a redis cluster using python
Redis Communication Protocol
List the main points to facilitate the understanding of the following program.
Redis monitors incoming connections on TCP port 6379 (default port, which can be modified in the configuration). Every Redis command or data transmitted between the client and the server ends with rn.
Reply (a protocol that the server can recover from the client)
Redis replies to commands with different reply types. It may check the reply type starting from the first byte sent by the server:
* With a single line reply (status reply), the first byte of the reply will be “ ”
* Error message, the first byte of the reply will be “-”
* Integer number, the first byte of the reply will be “:”
* Batch reply, the first byte of the reply will be “$”
* Multiple batch reply, the first byte of the reply will be “*”
Bulk Strings
Bulk replies are used by servers to return a single binary secure string.
C: GET mykey
S: $6rnfoobarrn
The server sends the first line of reply, which starts with "$" followed by the actual bytes to be sent. number, followed by the CRLF, then the actual data is sent, followed by 2 bytes of extra data for the final CRLF. The exact sequence sent by the server is as follows: "$6rnfoobarrn"
If the requested value does not exist, the batch reply will use the special value -1 as the data length, for example:
C: GET nonexistingkey
S: $-1
When the requested object does not exist, the client library API will not return an empty string, but an empty object. For example: the Ruby library returns "nil", while the C library returns NULL (or sets the specified flag in the reply object), etc.
Binary
To put it simply, binary means it contains. Therefore, when processing in C language, you cannot use str functions, such as strlen, strcpy, etc., because they are used to judge strings. The ending.
redis cluster
Super simple to build redis cluster
The official website also introduces how to build a redis cluster. It is troublesome to try it because centos6.5 is used. If you use a newer centos may be better.
Data sharding of Redis cluster
Redis cluster does not use consistent hashing, but introduces the concept of hash slots.
Redis cluster has 16384 hash slots , each key is checked modulo 16384 after passing the CRC16 check to determine which slot to place it. Each node in the cluster is responsible for a part of the hash slot. For example, if the current cluster has 3 nodes, then:
* Node A contains hash slots 0 to 5500.
* Node B contains hash slots 5501 to 11000.
* Node C contains hash slots 11001 to 16384.
This structure makes it easy to add or delete nodes. For example, if I want to add a new node D, I need to get some slots from nodes A, B, and C to D. If I want to remove node A, I need to The slots in A are moved to nodes B and C, and then the A node without any slots is removed from the cluster. Since moving the hash slot from one node to another does not stop the service, no matter what is added Deleting or changing the number of hash slots of a node will not cause the cluster to become unavailable.
The client and server side in the Redis cluster protocol
In the Redis cluster, nodes Responsible for storing data and recording the status of the cluster (including the mapping of key values to the correct nodes). Cluster nodes can also automatically discover other nodes, detect nodes that are not working properly, and elect master nodes from slave nodes when needed.
To perform these tasks, all cluster nodes communicate through a TCP connection (TCP bus?) and a binary protocol (cluster connection, cluster bus). Each node is connected to every other node in the cluster through a cluster bus. Nodes use a gossip protocol to spread cluster information, which can: discover new nodes, send ping packets (used to ensure that all nodes are working properly), and send cluster messages when certain situations occur. Cluster connections are also used to publish or subscribe to messages in the cluster.
Since cluster nodes cannot proxy requests, the client will redirect the command to other nodes when receiving redirections errors -MOVED and -ASK. Theoretically, the client is free to send requests to all nodes in the cluster and redirect requests to other nodes when needed, so the client does not need to save the cluster state. However, the client can cache the mapping relationship between key values and nodes, which can significantly improve the efficiency of command execution.
-MOVED
Simply put, when -MOVED is returned, the client connects to node A and requests to process the key, but in fact the key is actually on node B, so -MOVED is returned. The protocol is as follows:- MOVED 3999 127.0.0.1:6381
No need to consider the situation of -ASK.
Implementation of redis client in C language
The code is as follows:
#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <errno.h>#include <fcntl.h>#include <netdb.h>#include <sys/poll.h>#include <unistd.h>#include <sys/types.h>#include <stdlib.h>#include <stdio.h>ssize_t sock_write_loop( int fd, const void *vptr, size_t n ) { size_t nleft = 0; ssize_t nwritten = 0;const char *ptr; ptr = (char *) vptr; nleft = n;while( nleft > 0 ) {if( (nwritten = write(fd, ptr, nleft) ) <= 0 ) {if( errno == EINTR ) { nwritten = 0; //再次调用write }else{return -5; } } nleft = nleft - nwritten; ptr = ptr + nwritten; }return(n); }int sock_read_wait( int fd, int timeout ) {struct pollfd pfd; pfd.fd = fd; pfd.events = POLLIN; pfd.revents = 0; timeout *= 1000;for (;;) {switch( poll(&pfd, 1, timeout) ) {case -1:if( errno != EINTR ) {return (-2); }continue;case 0: errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLIN )return (0);elsereturn (-3); } } } ssize_t sock_read_tmo( int fd, void *vptr, size_t len, int timeout ) { if( timeout > 0 && sock_read_wait(fd, timeout) < 0 )return (-1);elsereturn (read(fd, vptr, len)); }int sock_connect_nore(const char *IPaddr , int port , int timeout) { // char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL ) {return -1; }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 ) {return -1; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 ) {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0; memset(&host, 0, sizeof(host)); memset(sBuf, 0, sizeof(sBuf)); memset(sHostIp, 0 , sizeof(sHostIp)); pHost = &host; #ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || #else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || #endif(pHost == NULL) ) { close(sock_fd);return -1; }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 ) { close(sock_fd);return -1; }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL ) { close(sock_fd);return -1; } if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 ) { close(sock_fd); return -1; }//end added by navy 2003.3.31 }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 ) { close(sock_fd); return -1; }return sock_fd; }int sock_connect(const char *IPaddr , int port , int timeout) {char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL ) {return -1; }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 ) {return -1; } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 ) {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0; memset(&host, 0, sizeof(host)); memset(sBuf, 0, sizeof(sBuf)); memset(sHostIp, 0 , sizeof(sHostIp)); pHost = &host; #ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || #else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || #endif(pHost == NULL) ) { close(sock_fd);return -1; }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 ) { close(sock_fd);return -1; }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL ) { close(sock_fd);return -1; } if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 ) { close(sock_fd); return -1; }//end added by navy 2003.3.31 }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 ) { close(sock_fd); return -1; } n = sock_read_tmo(sock_fd, temp, 4096, timeout);//一般错误if( n <= 0 ) { close(sock_fd); sock_fd = -1; }return sock_fd; }int sock_non_blocking(int fd, int on) {int flags;if ((flags = fcntl(fd, F_GETFL, 0)) < 0){return -10; }if (fcntl(fd, F_SETFL, on ? flags | O_NONBLOCK : flags & ~O_NONBLOCK) < 0){return -10; }return 0; }int sock_write_wait(int fd, int timeout) {struct pollfd pfd; pfd.fd = fd; pfd.events = POLLOUT; pfd.revents = 0; timeout *= 1000;for (;;) {switch( poll(&pfd, 1, timeout) ) {case -1:if( errno != EINTR ) {return (-2); }continue;case 0: errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLOUT )return (0);elsereturn (-3); } } }int sock_timed_connect(int sock, struct sockaddr * sa, int len, int timeout) {int error = 0; socklen_t error_len; sock_non_blocking(sock, 1);if( connect(sock, sa, len) == 0 ) { sock_non_blocking(sock, 0);return (0); }if( errno != EINPROGRESS ) { sock_non_blocking(sock, 0);return (-1); }/* * A connection is in progress. Wait for a limited amount of time for * something to happen. If nothing happens, report an error. */if( sock_write_wait(sock, timeout) != 0) { sock_non_blocking(sock, 0);return (-2); }/* * Something happened. Some Solaris 2 versions have getsockopt() itself * return the error, instead of returning it via the parameter list. */error = 0; error_len = sizeof(error);if( getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) != 0 ) { sock_non_blocking(sock, 0);return (-3); }if( error ) { errno = error; sock_non_blocking(sock, 0);return (-4); } sock_non_blocking(sock, 0);/* * No problems. */return (0); }static int check_ip_in_list(const char *ip, char *iplist) { char *token = NULL;char *saveptr = NULL; token = strtok_r(iplist, ",", &saveptr);while(token != NULL) { char *ptmp = NULL; char *ip_mask = strtok_r(token, "/", &ptmp);if(!ip_mask) return -1; char *ip_bit = strtok_r(NULL, "/", &ptmp); if(ip_bit) {int mask_bit = atoi(ip_bit);if(mask_bit < 0 || mask_bit >32)continue; unsigned long addr[4] = { 0 }; sscanf( ip_mask, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 ); unsigned long vl1 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3]; sscanf( ip, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 ); unsigned long vl2 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3]; vl1 = ( vl1 >> ( 32 - mask_bit ) ); vl2 = ( vl2 >> ( 32 - mask_bit ) );if( vl1 == vl2 ) return 1; }else{if(strcmp(ip,ip_mask) == 0) return 1; } token = strtok_r(NULL, ",", &saveptr); } return 0; }static int check_ip_in_redis(const char *redis_host, const char *ip,const char *rq_pro) {char buf[128];int loops = 0; strcpy(buf, redis_host); do{ loops ++;char *ptmp = NULL;char *host = strtok_r(buf, ":", &ptmp);if(!host) return -1;char *s_port = strtok_r(NULL, ":", &ptmp);if(!s_port) return -1;int port = atoi(s_port);char respone[40] = {0};int sock_fd = -1;if((sock_fd = sock_connect_nore(host, port, 5))<0)return -1;if(sock_write_loop(sock_fd, rq_pro, strlen(rq_pro)) != strlen(rq_pro)) { close(sock_fd);return -1; }if(sock_read_tmo(sock_fd, respone, sizeof(respone)-1, 5)<=0) { close(sock_fd);return -1; } if(strncmp(":0", respone, 2) == 0) { close(sock_fd);return 0; } else if(strncmp(":1", respone, 2) == 0) { close(sock_fd);return 1; } else if(strncmp("$", respone, 1) == 0) { int data_size = 0; int ret = 0;char *data_line = strstr(respone,"rn");if(!data_line) { close(sock_fd);return -1; } data_line = data_line+2; data_size = atoi(respone+1);if(data_size == -1) { close(sock_fd);return 0; }if(strlen(data_line) == data_size+2) { printf("line = %d, data_line = %sn",__LINE__,data_line); ret=check_ip_in_list(ip, data_line); close(sock_fd);return ret; }char *data = calloc(data_size+3,1);if(!data) { close(sock_fd);return -1; } strcpy(data,data_line);int read_size = strlen(data);int left_size = data_size + 2 - read_size;while(left_size > 0) {int nread = sock_read_tmo(sock_fd, data+read_size, left_size, 5);if(nread<=0) {free(data); close(sock_fd); return -1; } read_size += nread; left_size -= nread; } close(sock_fd); printf("line = %d, data = %sn",__LINE__,data); ret=check_ip_in_list(ip, data);free(data);return ret; } else if(strncmp("-MOVED", respone, 6) == 0) { close(sock_fd);char *p = strchr(respone, ' ');if(p == NULL)return -1; p = strchr(p+1, ' ');if(p == NULL)return -1; strcpy(buf, p+1); }else{ close(sock_fd);return -1; } }while(loops < 2);return -1; }int main(int argc,char *argv[]) {if(argc != 2) { printf("please input ipn");return -1; } const char *redis_ip = "127.0.0.1:7002";const char *domain = "test.com";char exist_pro[128] = {0};char get_pro[128] = {0}; snprintf(exist_pro,sizeof(exist_pro),"EXISTS test|%s|%srn",domain,"127.0.0.1"); snprintf(get_pro,sizeof(get_pro),"GET test_%srn",domain);int loops = 0;int ret = 0;do{ loops ++; ret = check_ip_in_redis(redis_ip, argv[1],exist_pro);if(ret == 0) ret = check_ip_in_redis(redis_ip, argv[1],get_pro); }while(loops < 3 && ret < 0); printf("line = %d, ret = %dn",__LINE__,ret);return ret; }
c_redis_cli.c
Mainly look at the check_ip_in_redis function, the others are all sockets Encapsulation.
Python implements redis client
#!/usr/bin/pythonimport sys import socketdef main(argv):if(len(argv) != 3):print "please input domain ip!"returnhost = "192.168.188.47" port = 7002while 1: s = socket.socket() s.connect((host, port)) cmd = 'set %s_white_ip %srn' % (argv[1],argv[2]) s.send(cmd) res = s.recv(32) s.close() if res[0] == "+":print "set domain white ip suc!"return elif res[0:6] == "-MOVED": list = res.split(" ") ip_list = list[2].split(":") host = ip_list[0] port = int(ip_list[1]) else:print "set domain white ip error!"return if __name__ == "__main__": main(sys.argv)
The above is the detailed content of How to quickly build a redis cluster using python. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

Solution to permission issues when viewing Python version in Linux terminal When you try to view Python version in Linux terminal, enter python...

When using Python's pandas library, how to copy whole columns between two DataFrames with different structures is a common problem. Suppose we have two Dats...

Getting started with Python: Hourglass Graphic Drawing and Input Verification This article will solve the variable definition problem encountered by a Python novice in the hourglass Graphic Drawing Program. Code...

Choice of Python Cross-platform desktop application development library Many Python developers want to develop desktop applications that can run on both Windows and Linux systems...

Many developers rely on PyPI (PythonPackageIndex)...

Data Conversion and Statistics: Efficient Processing of Large Data Sets This article will introduce in detail how to convert a data list containing product information to another containing...

Alternative usage of Python parameter annotations In Python programming, parameter annotations are a very useful function that can help developers better understand and use functions...

How to handle high resolution images in Python to find white areas? Processing a high-resolution picture of 9000x7000 pixels, how to accurately find two of the picture...
