目錄
  redis通訊協定
  回覆(服務端可客戶端復原的協定)
Bulk Strings(批次回覆)
  二進位
  redis集群
  超簡單搭建redis集群
  Redis 集群的資料分片
  Redis 叢集協定中的客戶端和伺服器端
  -MOVED
  C語言實作redis客戶端
  python實作redis客戶端
首頁 資料庫 Redis 怎麼用python快速搭建redis集群

怎麼用python快速搭建redis集群

May 26, 2023 pm 03:56 PM
python redis

  redis通訊協定

  列出主要的點,以便於對於下面程式的理解。

  Redis在TCP端口6379(預設端口,在配置可以修改)上監聽到來的連接,在客戶端與伺服器端之間傳輸的每個Redis命令或者資料都以rn結尾。

  回覆(服務端可客戶端復原的協定)

  Redis用不同的回覆類型回覆指令。它可能從伺服器發送的第一個位元組開始校驗回覆類型:

  * 用單行回覆(狀態回覆),回覆的第一個字節將是“ ”

  * 錯誤訊息,回覆的第一個位元組將是“-”

  * 整型數字,回覆的第一個字節將是“:”

* 批次回复,回复的第一個位元組將是“$”

  * 多個批次回复,回复的第一個字節將是“*”

Bulk Strings(批次回覆)

  批次回覆被伺服器用來傳回單一二進位安全字串。

  C: GET mykey

  S: $6rnfoobarrn

  伺服器發送第一行回复,該行以“$”開始後面跟隨實際要發送的字節數,隨後是CRLF,然後發送實際數據,隨後是2個位元組的額外數據用於最後的CRLF。伺服器發送的準確序列如下:

  ”$6rnfoobarrn”

  如果請求的值不存在,批量回復將使用特殊的值-1來作為資料長度,例如:

  C: GET nonexistingkey

  S: $-1

  當請求的物件不存在時,客戶端程式庫API不會傳回空字串,而會傳回空物件。例如:Ruby庫回傳‘nil’,而C庫則回傳NULL(或在回覆的物件裡設定指定的標誌)等等。

  二進位

  簡單說下二進制,就是會包含,所以C語言在處理的時候,就不能用str函數,像strlen、strcpy等,因為它們都是以來判斷字串結尾的。

  redis集群

  超簡單搭建redis集群

  官網也介紹瞭怎麼搭建redis集群,試過比較麻煩,因為用的centos6.5,如果用較新的centos,可能會好一點。

  Redis 集群的資料分片

  Redis 集群沒有使用一致性hash, 而是引入了哈希槽的概念.

  Redis 集群有16384個哈希槽,每個key透過CRC16校驗後對16384取模來決定放置哪個槽.集群的每個節點負責一部分hash槽,舉個例子,比如當前集群有3個節點,那麼:

  *節點A 包含0 到5500號哈希槽.

  * 節點B 包含5501 到11000 號哈希槽.

  * 節點C 包含11001 到16384號哈希槽.

#  這種結構很容易添加或刪除節點. 例如如果我想新新增個節點D, 我需要從節點A, B, C中得部分槽到D上. 如果我想移除節點A,需要將A中的槽移到B和C節點上,然後將沒有任何槽的A節點從集群中移除即可.   由於從一個節點將哈希槽移動到另一個節點並不會停止服務,所以無論添加刪除或改變某個節點的雜湊槽的數量都不會造成叢集不可用的狀態.

  Redis 叢集協定中的客戶端和伺服器端

  在Redis 叢集中,節點負責儲存資料、記錄叢集的狀態(包括鍵值到正確節點的對應)。叢集節點同樣能自動發現其他節點,偵測出沒正常運作的節點, 並且在需要的時候在從節點中推選出主節點。

  為了執行這些任務,所有的叢集節點都透過TCP連接(TCP bus?)和一個二進位協定(叢集連接,cluster bus)建立通訊。每一個節點都透過叢集連接(cluster bus)與叢集上的其餘每個節點連接起來。  節點們使用一個 gossip 協定來傳播叢集的訊息,這樣可以:發現新的節點、 發送ping包(用來確保所有節點都在正常工作中)、在特定情況發生時發送叢集訊息。集群連接也用於在集群中發布或訂閱訊息。

  由於叢集節點不能代理(proxy)請求,所以客戶端在接收到重定向錯誤(redirections errors) -MOVED 和 -ASK 的時候, 將指令重定向到其他節點。理論上來說,客戶端是可以自由地向叢集中的所有節點發送請求,在需要的時候把請求重定向到其他節點,所以客戶端是不需要保存叢集狀態。不過客戶端可以快取鍵值和節點之間的映射關係,這樣能明顯提高指令執行的效率。

  -MOVED

  簡單說下返回-MOVED的情況,就是客戶端連節點A請求處理key,但其實key其實在節點B,就返回-MOVED,協議如下:- MOVED 3999 127.0.0.1:6381

不用考慮-ASK的情況。

  C語言實作redis客戶端

  程式碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <errno.h>#include <fcntl.h>#include <netdb.h>#include <sys/poll.h>#include <unistd.h>#include <sys/types.h>#include <stdlib.h>#include <stdio.h>ssize_t sock_write_loop( int fd, const void *vptr, size_t n )

{

    size_t nleft = 0;

    ssize_t nwritten = 0;const char *ptr;

 

    ptr = (char *) vptr;

    nleft = n;while( nleft > 0 )

    {if( (nwritten = write(fd, ptr, nleft) ) <= 0 )

        {if( errno == EINTR )

            {

                nwritten = 0;  //再次调用write            }else{return -5;

            }

        }

        nleft = nleft - nwritten;

        ptr = ptr + nwritten;

    }return(n);

}int sock_read_wait( int fd, int timeout )

{struct pollfd pfd;

 

    pfd.fd = fd;

    pfd.events = POLLIN;

    pfd.revents = 0;

 

    timeout *= 1000;for (;;)

    {switch( poll(&pfd, 1, timeout) )

        {case -1:if( errno != EINTR )

                {return (-2);

                }continue;case 0:

                errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLIN )return (0);elsereturn (-3);

        }

    }

 

}

 

ssize_t sock_read_tmo( int fd, void *vptr, size_t len, int timeout )

{   if( timeout > 0 && sock_read_wait(fd, timeout) < 0 )return (-1);elsereturn (read(fd, vptr, len));

 

}int sock_connect_nore(const char *IPaddr , int port , int timeout)

{   // char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL )

    {return -1;

    }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )

    {return -1;

    }

 

    memset(&servaddr, 0, sizeof(servaddr));

    servaddr.sin_family = AF_INET;

    servaddr.sin_port   = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 )

    {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0;

 

        memset(&host, 0, sizeof(host));

        memset(sBuf, 0, sizeof(sBuf));

        memset(sHostIp, 0 , sizeof(sHostIp));

        pHost = &host;

 

#ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) ||

#else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) ||

#endif(pHost == NULL) )

                {

                close(sock_fd);return -1;

                }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 )

                {

                close(sock_fd);return -1;

                }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )

                {

                    close(sock_fd);return -1;

                }                if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 )

                {

                    close(sock_fd);                    return -1;

                }//end added by navy 2003.3.31    }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 )

    {

        close(sock_fd);        return -1;

    }return sock_fd;

}int sock_connect(const char *IPaddr , int port , int timeout)

{char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL )

    {return -1;

    }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )

    {return -1;

    }

 

    memset(&servaddr, 0, sizeof(servaddr));

    servaddr.sin_family = AF_INET;

    servaddr.sin_port   = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 )

    {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0;

 

        memset(&host, 0, sizeof(host));

        memset(sBuf, 0, sizeof(sBuf));

        memset(sHostIp, 0 , sizeof(sHostIp));

        pHost = &host;

 

#ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) ||

#else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) ||

#endif(pHost == NULL) )

                {

                close(sock_fd);return -1;

                }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 )

                {

                close(sock_fd);return -1;

                }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )

                {

                    close(sock_fd);return -1;

                }                if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 )

                {

                    close(sock_fd);                    return -1;

                }//end added by navy 2003.3.31    }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 )

    {

        close(sock_fd);        return -1;

    }

 

    n = sock_read_tmo(sock_fd, temp, 4096, timeout);//一般错误if( n <= 0 )

    {

        close(sock_fd);

         

        sock_fd = -1;

    }return sock_fd;

}int sock_non_blocking(int fd, int on)

{int     flags;if ((flags = fcntl(fd, F_GETFL, 0)) < 0){return -10;

    }if (fcntl(fd, F_SETFL, on ? flags | O_NONBLOCK : flags & ~O_NONBLOCK) < 0){return -10;

    }return 0;

}int sock_write_wait(int fd, int timeout)

{struct pollfd pfd;

 

    pfd.fd = fd;

    pfd.events = POLLOUT;

    pfd.revents = 0;

 

    timeout *= 1000;for (;;)

    {switch( poll(&pfd, 1, timeout) )

        {case -1:if( errno != EINTR )

            {return (-2);

            }continue;case 0:

            errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLOUT )return (0);elsereturn (-3);

        }

    }

 

}int sock_timed_connect(int sock, struct sockaddr * sa, int len, int timeout)

{int error = 0;

    socklen_t error_len;

 

    sock_non_blocking(sock, 1);if( connect(sock, sa, len) == 0 )

    {

        sock_non_blocking(sock, 0);return (0);

    }if( errno != EINPROGRESS )

    {

        sock_non_blocking(sock, 0);return (-1);

    }/* * A connection is in progress. Wait for a limited amount of time for

     * something to happen. If nothing happens, report an error.     */if( sock_write_wait(sock, timeout) != 0)

    {

        sock_non_blocking(sock, 0);return (-2);

    }/* * Something happened. Some Solaris 2 versions have getsockopt() itself

     * return the error, instead of returning it via the parameter list.     */error = 0;

    error_len = sizeof(error);if( getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) != 0 )

    {

        sock_non_blocking(sock, 0);return (-3);

    }if( error )

    {

        errno = error;

        sock_non_blocking(sock, 0);return (-4);

    }

 

    sock_non_blocking(sock, 0);/* * No problems.     */return (0);

 

}static int check_ip_in_list(const char *ip, char *iplist)

{        char *token = NULL;char *saveptr = NULL;

    token = strtok_r(iplist, ",", &saveptr);while(token != NULL)

    {        char *ptmp = NULL;                        char *ip_mask = strtok_r(token, "/", &ptmp);if(!ip_mask)                    return -1;                     char *ip_bit = strtok_r(NULL, "/", &ptmp);        if(ip_bit)

        {int mask_bit = atoi(ip_bit);if(mask_bit < 0 || mask_bit >32)continue;

 

            unsigned long addr[4] = { 0 };

            sscanf( ip_mask, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 );

            unsigned long vl1 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3];

 

            sscanf( ip, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 );

            unsigned long vl2 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3];

 

            vl1 = ( vl1 >> ( 32 - mask_bit ) );

            vl2 = ( vl2 >> ( 32 - mask_bit ) );if( vl1 == vl2 )                        return 1;                         

        }else{if(strcmp(ip,ip_mask) == 0)            return 1;                           

        }                   

 

        token = strtok_r(NULL, ",", &saveptr);               

    }        return 0;

}static int check_ip_in_redis(const char *redis_host, const char *ip,const char *rq_pro)

{char buf[128];int loops = 0;   

 

    strcpy(buf, redis_host);    do{

        loops ++;char *ptmp = NULL;char *host = strtok_r(buf, ":", &ptmp);if(!host) return -1;char *s_port = strtok_r(NULL, ":", &ptmp);if(!s_port) return -1;int port = atoi(s_port);char respone[40] = {0};int sock_fd = -1;if((sock_fd = sock_connect_nore(host, port, 5))<0)return -1;if(sock_write_loop(sock_fd, rq_pro, strlen(rq_pro)) != strlen(rq_pro))

        {

            close(sock_fd);return -1;

        }if(sock_read_tmo(sock_fd, respone, sizeof(respone)-1, 5)<=0)

        {

            close(sock_fd);return -1;

        }        if(strncmp(":0", respone, 2) == 0)

        {

            close(sock_fd);return 0;

        }            else if(strncmp(":1", respone, 2) == 0)

        {

            close(sock_fd);return 1;

        }            else if(strncmp("$", respone, 1) == 0)

        {                                    int data_size = 0;   int ret = 0;char *data_line = strstr(respone,"rn");if(!data_line)

            {

                close(sock_fd);return -1;

            }

            data_line = data_line+2;

 

            data_size = atoi(respone+1);if(data_size == -1)

            {

                close(sock_fd);return 0;

            }if(strlen(data_line) == data_size+2)

            {

                printf("line = %d, data_line = %sn",__LINE__,data_line);

                ret=check_ip_in_list(ip, data_line);

                close(sock_fd);return ret;

            }char *data = calloc(data_size+3,1);if(!data)

            {

                close(sock_fd);return -1;

            }

            strcpy(data,data_line);int read_size = strlen(data);int left_size = data_size + 2 - read_size;while(left_size > 0)

            {int nread = sock_read_tmo(sock_fd, data+read_size, left_size, 5);if(nread<=0)

                {free(data);

                    close(sock_fd);            return -1;

                }

                read_size += nread;

                left_size -= nread;

            }

            close(sock_fd);

            printf("line = %d, data = %sn",__LINE__,data);

            ret=check_ip_in_list(ip, data);free(data);return ret;

        }            else if(strncmp("-MOVED", respone, 6) == 0)

        {

            close(sock_fd);char *p = strchr(respone, ' ');if(p == NULL)return -1;

 

            p = strchr(p+1, ' ');if(p == NULL)return -1;

 

            strcpy(buf, p+1);

        }else{

            close(sock_fd);return -1;

        }           

         

    }while(loops < 2);return -1;

}int main(int argc,char *argv[])

{if(argc != 2)

    {

        printf("please input ipn");return -1;

    }     const char *redis_ip = "127.0.0.1:7002";const char *domain = "test.com";char exist_pro[128] = {0};char get_pro[128] = {0};   

    snprintf(exist_pro,sizeof(exist_pro),"EXISTS test|%s|%srn",domain,"127.0.0.1");       

    snprintf(get_pro,sizeof(get_pro),"GET test_%srn",domain);int loops = 0;int ret = 0;do{

        loops ++;

        ret = check_ip_in_redis(redis_ip, argv[1],exist_pro);if(ret == 0)

            ret = check_ip_in_redis(redis_ip, argv[1],get_pro);

    }while(loops < 3 && ret < 0);

 

    printf("line = %d, ret = %dn",__LINE__,ret);return ret;

}

登入後複製

c_redis_cli.c

  主要看這個check_ip_in_redis就行了,其它都是一些socket的它都是一些socket的函數就行了,其它都是一些socket的函數都是一些socket封裝。

  python實作redis客戶端

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

#!/usr/bin/pythonimport sys 

import socketdef main(argv):if(len(argv) != 3):print "please input domain ip!"returnhost = "192.168.188.47"  

    port = 7002while 1:

        s = socket.socket()               

        s.connect((host, port))

         

        cmd = 'set %s_white_ip %srn' % (argv[1],argv[2])       

        s.send(cmd)

        res = s.recv(32)

        s.close()       

    if res[0] == "+":print "set domain white  ip suc!"return    elif res[0:6] == "-MOVED":

            list = res.split(" ")

            ip_list = list[2].split(":")           

            host = ip_list[0]   

            port = int(ip_list[1])                            else:print "set domain white  ip error!"return                               if __name__ == "__main__":

    main(sys.argv)

登入後複製

以上是怎麼用python快速搭建redis集群的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱門文章

倉庫:如何復興隊友
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
1 週前 By 尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island冒險:如何獲得巨型種子
3 週前 By 尊渡假赌尊渡假赌尊渡假赌

熱門文章

倉庫:如何復興隊友
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
1 週前 By 尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island冒險:如何獲得巨型種子
3 週前 By 尊渡假赌尊渡假赌尊渡假赌

熱門文章標籤

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

Windows11安裝10.0.22000.100跳出0x80242008錯誤解決方法 Windows11安裝10.0.22000.100跳出0x80242008錯誤解決方法 May 08, 2024 pm 03:50 PM

Windows11安裝10.0.22000.100跳出0x80242008錯誤解決方法

怎麼下載deepseek 小米 怎麼下載deepseek 小米 Feb 19, 2025 pm 05:27 PM

怎麼下載deepseek 小米

模板化的優點和缺點有哪些? 模板化的優點和缺點有哪些? May 08, 2024 pm 03:51 PM

模板化的優點和缺點有哪些?

Google AI 為開發者發佈 Gemini 1.5 Pro 和 Gemma 2 Google AI 為開發者發佈 Gemini 1.5 Pro 和 Gemma 2 Jul 01, 2024 am 07:22 AM

Google AI 為開發者發佈 Gemini 1.5 Pro 和 Gemma 2

deepseek怎麼問他 deepseek怎麼問他 Feb 19, 2025 pm 04:42 PM

deepseek怎麼問他

PHP開發中的快取機制與應用實戰 PHP開發中的快取機制與應用實戰 May 09, 2024 pm 01:30 PM

PHP開發中的快取機制與應用實戰

NET40是什麼軟體 NET40是什麼軟體 May 10, 2024 am 01:12 AM

NET40是什麼軟體

deepseek該怎麼搜索 deepseek該怎麼搜索 Feb 19, 2025 pm 05:18 PM

deepseek該怎麼搜索

See all articles