目錄
基于Hiredis异步API的聊天系统实现
首頁 資料庫 mysql教程 基于Hiredis异步API的聊天系统实现

基于Hiredis异步API的聊天系统实现

Jun 07, 2016 pm 02:51 PM
api 基於 實現 非同步 系統 聊天

基于Hiredis异步API的聊天系统实现 上一篇文章http://blog.csdn.net/qq_34788352/article/details/51313027使用Hiredis的同步API实现了发送消息的客户端,当我使用同步API实现订阅频道客户端时,一旦订阅频道,就会出现无法操作的情况,这是就是同步和异步的

基于Hiredis异步API的聊天系统实现

上一篇文章http://blog.csdn.net/qq_34788352/article/details/51313027使用Hiredis的同步API实现了发送消息的客户端,当我使用同步API实现订阅频道客户端时,一旦订阅频道,就会出现无法操作的情况,这是就是同步和异步的问题。使用同步API,订阅频道后,客户端会进入阻塞状态,等待订阅频道发布的消息,不能实现既订阅频道,又能发布消息的功能。为了实现一个客户端既能订阅频道,又能发布消息的功能,就需要使用Hiredis的异步API。

首先特别感谢黄天霸、tickTick、vah101,当我遇到各种各样奇怪问题的时候,你们的帖子给与了我解答,谢谢。

开始正题,hiredis异步的实现主要是依靠redis自带的ae事件库或者libev事件库或者libevent的事件库或者libuv事件库。网上一些人是通过libevent事件库来实现,本系统则使用Redis自带ae事件库来实现,Redis不用libevent事件库而选择重写一个ae事件库,必定有其道理。

首先介绍使用到的异步API,位于async.h中:

<code class=" hljs perl">redisAsyncContext <span class="hljs-variable">*redisAsyncConnect</span>(const char <span class="hljs-variable">*ip</span>, <span class="hljs-keyword">int</span> port); <span class="hljs-regexp">//</span>用于建立异步连接

<span class="hljs-keyword">int</span> redisAsyncSetConnectCallback(redisAsyncContext <span class="hljs-variable">*ac</span>, redisConnectCallback <span class="hljs-variable">*fn</span>);  <span class="hljs-regexp">//</span>设置连接回调函数,回调函数形式:void callback(const redisAsyncContext <span class="hljs-variable">*c</span>, <span class="hljs-keyword">int</span> status);

<span class="hljs-keyword">int</span> redisAsyncSetDisconnectCallback(redisAsyncContext <span class="hljs-variable">*ac</span>, redisDisconnectCallback <span class="hljs-variable">*fn</span>);  <span class="hljs-regexp">//</span>设置断开连接回调函数,回调函数形式:void callback(const redisAsyncContext <span class="hljs-variable">*c</span>, <span class="hljs-keyword">int</span> status);

void redisAsyncDisconnect(redisAsyncContext <span class="hljs-variable">*ac</span>); <span class="hljs-regexp">//</span>断开异步连接

void redisAsyncFree(redisAsyncContext <span class="hljs-variable">*ac</span>); <span class="hljs-regexp">//</span>释放建立连接时,创建的redisAsyncContext结构

<span class="hljs-keyword">int</span> redisAsyncCommand(redisAsyncContext <span class="hljs-variable">*ac</span>, redisCallbackFn <span class="hljs-variable">*fn</span>, void <span class="hljs-variable">*privdata</span>, const char <span class="hljs-variable">*format</span>, ...); <span class="hljs-regexp">//</span>发送Redis命令,需要实现一个回调函数来出来命令的返回,fn是回调函数的地址,回调函数形式:void callback(redisAsyncContext <span class="hljs-variable">*c</span>, void <span class="hljs-variable">*reply</span>, void <span class="hljs-variable">*pridata</span>);
</code>
登入後複製

有了上面的异步API,就可以开始客户端的撰写。
首先封装订阅端

<code class=" hljs cpp"><span class="hljs-comment">//sub_client.h</span>

<span class="hljs-preprocessor">#ifndef REDIS_SUB_CLIENT_H  </span>
<span class="hljs-preprocessor">#define REDIS_SUB_CLIENT_H  </span>

<span class="hljs-keyword">extern</span> <span class="hljs-string">"C"</span>
{  
<span class="hljs-preprocessor">#include <stdlib.h>  </span>
<span class="hljs-preprocessor">#include <async.h>  </span>
<span class="hljs-preprocessor">#include <adapters/ae.h> </span>
<span class="hljs-preprocessor">#include <unistd.h>  </span>
<span class="hljs-preprocessor">#include <pthread.h>  </span>
<span class="hljs-preprocessor">#include <semaphore.h> </span>
} 

<span class="hljs-preprocessor">#include <string>  </span>
<span class="hljs-preprocessor">#include <vector>  </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>;

<span class="hljs-keyword">class</span> CRedisSubClient  
{  
<span class="hljs-keyword">public</span>:   

    CRedisSubClient();  
    ~CRedisSubClient();  

    <span class="hljs-keyword">bool</span> init();   <span class="hljs-comment">//初始化,事件对象,信号量  </span>
    <span class="hljs-keyword">bool</span> uninit();  <span class="hljs-comment">//释放对象</span>
    <span class="hljs-keyword">bool</span> connect();  <span class="hljs-comment">//连接服务器</span>
    <span class="hljs-keyword">bool</span> disconnect(); <span class="hljs-comment">//断开服务器</span>

    <span class="hljs-comment">//订阅频道 </span>
    <span class="hljs-keyword">bool</span> subscribe(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name);  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// 下面三个回调函数供redis服务调用  </span>
    <span class="hljs-comment">// 连接回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 断开连接的回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> disconnect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 执行命令回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> command_callback(redisAsyncContext *redis_context,  
        <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata);  

    <span class="hljs-comment">// 事件分发线程函数  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> *event_thread(<span class="hljs-keyword">void</span> *data);  
    <span class="hljs-keyword">void</span> *event_proc();  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// ae事件对象  </span>
    aeEventLoop *loop;  
    <span class="hljs-comment">// 事件线程ID  </span>
    pthread_t _event_thread;  
    <span class="hljs-comment">// 事件线程的信号量  </span>
    sem_t _event_sem;  
    <span class="hljs-comment">// hiredis异步对象  </span>
    redisAsyncContext *_redis_context;  

};  

<span class="hljs-preprocessor">#endif  </span>
</code>
登入後複製

使用extern “C”是因为redis和hiredis都是c写的,当使用c++链接c代码生成的库中的函数时,会出现undefined reference的问题。

订阅端的实现

<code class=" hljs cpp"><span class="hljs-comment">//sub_client.cpp</span>

<span class="hljs-preprocessor">#include <stddef.h>  </span>
<span class="hljs-preprocessor">#include <assert.h>  </span>
<span class="hljs-preprocessor">#include <string.h>  </span>
<span class="hljs-preprocessor">#include "sub_client.h" </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>; 

CRedisSubClient::CRedisSubClient():loop(<span class="hljs-number">0</span>), _event_thread(<span class="hljs-number">0</span>),  
_redis_context(<span class="hljs-number">0</span>)  
{  
}  

CRedisSubClient::~CRedisSubClient()  
{  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::init()  
{  

    loop = aeCreateEventLoop(<span class="hljs-number">64</span>);    <span class="hljs-comment">// 创建ae对象</span>

    <span class="hljs-keyword">if</span> (NULL == loop)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create redis event failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-built_in">memset</span>(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-keyword">sizeof</span>(_event_sem)); 

    <span class="hljs-keyword">int</span> ret = sem_init(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-number">0</span>);  <span class="hljs-comment">//初始化线程信号量</span>

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Init sem failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::uninit()  
{  
    loop = NULL;  

    sem_destroy(&_event_sem);  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::connect()  
{    
    _redis_context = redisAsyncConnect(<span class="hljs-string">"127.0.0.1"</span>, <span class="hljs-number">6379</span>);    <span class="hljs-comment">// 异步连接到redis服务器上,使用6380端口</span>

    <span class="hljs-keyword">if</span> (NULL == _redis_context)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Connect redis failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">if</span> (_redis_context->err)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Connect redis error: %d, %s\n"</span>,   
            _redis_context->err, _redis_context->errstr);    <span class="hljs-comment">// 输出错误信息  </span>
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    redisAeAttach(loop,_redis_context);   <span class="hljs-comment">// 将事件绑定到redis context上,使redis的回调跟事件关联  </span>

    <span class="hljs-comment">// 创建事件处理线程  </span>
    <span class="hljs-keyword">int</span> ret = pthread_create(&_event_thread, <span class="hljs-number">0</span>, &CRedisSubscriber::event_thread, <span class="hljs-keyword">this</span>);  

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create event thread failed.\n"</span>);  
        disconnect();  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-comment">// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态  </span>
    redisAsyncSetConnectCallback(_redis_context,   
        &CRedisSubClient::connect_callback);  

    <span class="hljs-comment">// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连  </span>
    redisAsyncSetDisconnectCallback(_redis_context,  
        &CRedisSubClient::disconnect_callback);  

    <span class="hljs-comment">// 启动事件线程  </span>
    sem_post(&_event_sem);  
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::disconnect()  
{  
    <span class="hljs-keyword">if</span> (_redis_context)  
    {  
        redisAsyncDisconnect(_redis_context);  
        redisAsyncFree(_redis_context);  
        _redis_context = NULL;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::subscribe(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name)  
{  
    <span class="hljs-keyword">int</span> ret = redisAsyncCommand(_redis_context,   
        &CRedisSubscriber::command_callback, <span class="hljs-keyword">this</span>, <span class="hljs-string">"SUBSCRIBE %s"</span>,   
        channel_name.c_str()); <span class="hljs-comment">//订阅一个频道</span>

    <span class="hljs-keyword">if</span> (REDIS_ERR == ret)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Subscribe command failed: %d\n"</span>, ret);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Subscribe success: %s\n"</span>, channel_name.c_str());  
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">void</span> CRedisSubClient::connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
    <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Error: %s\n"</span>, redis_context->errstr);  
    }  
    <span class="hljs-keyword">else</span>  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Redis connected!"</span>);  
    }  
}  

<span class="hljs-keyword">void</span> CRedisSubClient::disconnect_callback(  
    <span class="hljs-keyword">const</span> redisAsyncContext *redis_context, <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {    
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error: %s\n"</span>, redis_context->errstr);  
    }  
}  

<span class="hljs-comment">// 消息接收回调函数  </span>
<span class="hljs-keyword">void</span> CRedisSubClient::command_callback(redisAsyncContext *redis_context,  
    <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata)  
{  
    <span class="hljs-keyword">if</span> (NULL == reply || NULL == privdata) {  
        <span class="hljs-keyword">return</span> ;  
    }  

    redisReply *redis_reply = <span class="hljs-keyword">reinterpret_cast</span><redisReply *>(reply);  

    <span class="hljs-comment">// 订阅接收到的消息是一个带三元素的数组  </span>
    <span class="hljs-keyword">if</span> (redis_reply->type == REDIS_REPLY_ARRAY &&  
    redis_reply->elements == <span class="hljs-number">3</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Recieve message:%s %s %s\n"</span>,  
        redis_reply->element[<span class="hljs-number">0</span>]->str  
        redis_reply->element[<span class="hljs-number">1</span>]->str  
        redis_reply->element[<span class="hljs-number">2</span>]->str);    
    }  
}  

<span class="hljs-keyword">void</span> *CRedisSubClient::event_thread(<span class="hljs-keyword">void</span> *data)  
{  
    <span class="hljs-keyword">if</span> (NULL == data)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error!\n"</span>);  
        assert(<span class="hljs-keyword">false</span>);  
        <span class="hljs-keyword">return</span> NULL;  
    }  

    CRedisSubClient *self_this = <span class="hljs-keyword">reinterpret_cast</span><CRedisSubClient *>(data);  
    <span class="hljs-keyword">return</span> self_this->event_proc();  
}  

<span class="hljs-keyword">void</span> *CRedisSubClient::event_proc()  
{  
    sem_wait(&_event_sem);  

    <span class="hljs-comment">//进行事件处理循环  </span>
    aeMain(loop);  

    <span class="hljs-keyword">return</span> NULL;  
}  </code>
登入後複製

发布端的封装:

<code class=" hljs cpp"><span class="hljs-comment">//pub_client.h</span>

<span class="hljs-preprocessor">#ifndef REDIS_PUB_CLIENT_H  </span>
<span class="hljs-preprocessor">#define REDIS_PUB_CLIENT_H  </span>

<span class="hljs-keyword">extern</span> <span class="hljs-string">"C"</span>
{  
<span class="hljs-preprocessor">#include <stdlib.h>  </span>
<span class="hljs-preprocessor">#include <async.h>  </span>
<span class="hljs-preprocessor">#include <adapters/ae.h> </span>
<span class="hljs-preprocessor">#include <unistd.h>  </span>
<span class="hljs-preprocessor">#include <pthread.h>  </span>
<span class="hljs-preprocessor">#include <semaphore.h> </span>
} 

<span class="hljs-preprocessor">#include <string>  </span>
<span class="hljs-preprocessor">#include <vector>  </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>;

<span class="hljs-keyword">class</span> CRedisPubClient  
{  
<span class="hljs-keyword">public</span>:   

    CRedisPubClient();  
    ~CRedisPubClient();  

    <span class="hljs-keyword">bool</span> init();   <span class="hljs-comment">//初始化,事件对象,信号量  </span>
    <span class="hljs-keyword">bool</span> uninit();  <span class="hljs-comment">//释放对象</span>
    <span class="hljs-keyword">bool</span> connect();  <span class="hljs-comment">//连接服务器</span>
    <span class="hljs-keyword">bool</span> disconnect();  <span class="hljs-comment">//断开服务器</span>

    <span class="hljs-comment">//订阅频道 </span>
    <span class="hljs-keyword">bool</span> publish(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name, <span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &message);  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// 下面三个回调函数供redis服务调用  </span>
    <span class="hljs-comment">// 连接回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 断开连接的回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> disconnect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 执行命令回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> command_callback(redisAsyncContext *redis_context,  
        <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata);  

    <span class="hljs-comment">// 事件分发线程函数  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> *event_thread(<span class="hljs-keyword">void</span> *data);  
    <span class="hljs-keyword">void</span> *event_proc();  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// ae事件对象  </span>
    aeEventLoop *loop;  
    <span class="hljs-comment">// 事件线程ID  </span>
    pthread_t _event_thread;  
    <span class="hljs-comment">// 事件线程的信号量  </span>
    sem_t _event_sem;  
    <span class="hljs-comment">// hiredis异步对象  </span>
    redisAsyncContext *_redis_context;  

};  

<span class="hljs-preprocessor">#endif  </span></code>
登入後複製

发布端的实现:

<code class=" hljs cpp"><span class="hljs-comment">//pub_client.cpp</span>

<span class="hljs-preprocessor">#include <stddef.h>  </span>
<span class="hljs-preprocessor">#include <assert.h>  </span>
<span class="hljs-preprocessor">#include <string.h>  </span>
<span class="hljs-preprocessor">#include "pub_client.h" </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>; 

CRedisPubClient::CRedisPubClient():loop(<span class="hljs-number">0</span>), _event_thread(<span class="hljs-number">0</span>),  
_redis_context(<span class="hljs-number">0</span>)  
{  
}  

CRedisPubClient::~CRedisPubClient()  
{  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::init()  
{  

    loop = aeCreateEventLoop(<span class="hljs-number">64</span>);    <span class="hljs-comment">// 创建ae对象</span>

    <span class="hljs-keyword">if</span> (NULL == loop)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create redis event failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-built_in">memset</span>(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-keyword">sizeof</span>(_event_sem)); 

    <span class="hljs-keyword">int</span> ret = sem_init(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-number">0</span>);  <span class="hljs-comment">//初始化线程信号量</span>

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Init sem failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::uninit()  
{  
    loop = NULL;  

    sem_destroy(&_event_sem);  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::connect()  
{    
    _redis_context = redisAsyncConnect(<span class="hljs-string">"127.0.0.1"</span>, <span class="hljs-number">6379</span>);    <span class="hljs-comment">// 异步连接到redis服务器上,使用6380端口</span>

    <span class="hljs-keyword">if</span> (NULL == _redis_context)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Connect redis failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">if</span> (_redis_context->err)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Connect redis error: %d, %s\n"</span>,   
            _redis_context->err, _redis_context->errstr);    <span class="hljs-comment">// 输出错误信息  </span>
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    redisAeAttach(loop,_redis_context);   <span class="hljs-comment">// 将事件绑定到redis context上,使redis的回调跟事件关联  </span>

    <span class="hljs-comment">// 创建事件处理线程  </span>
    <span class="hljs-keyword">int</span> ret = pthread_create(&_event_thread, <span class="hljs-number">0</span>, &CRedisSubscriber::event_thread, <span class="hljs-keyword">this</span>);  

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create event thread failed.\n"</span>);  
        disconnect();  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-comment">// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态  </span>
    redisAsyncSetConnectCallback(_redis_context,   
        &CRedisSubClient::connect_callback);  

    <span class="hljs-comment">// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连  </span>
    redisAsyncSetDisconnectCallback(_redis_context,  
        &CRedisSubClient::disconnect_callback);  

    <span class="hljs-comment">// 启动事件线程  </span>
    sem_post(&_event_sem);  
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::disconnect()  
{  
    <span class="hljs-keyword">if</span> (_redis_context)  
    {  
        redisAsyncDisconnect(_redis_context);  
        redisAsyncFree(_redis_context);  
        _redis_context = NULL;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::publish(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name, <span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &message)  
{  
    <span class="hljs-keyword">int</span> ret = redisAsyncCommand(_redis_context,   
        &CRedisPublisher::command_callback, <span class="hljs-keyword">this</span>, <span class="hljs-string">"PUBLISH %s %s"</span>,   
        channel_name.c_str(), message.c_str());  <span class="hljs-comment">//发布消息  </span>

    <span class="hljs-keyword">if</span> (REDIS_ERR == ret)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Publish command failed: %d\n"</span>, ret);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;    
}  

<span class="hljs-keyword">void</span> CRedisPubClient::connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
    <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Error: %s\n"</span>, redis_context->errstr);  
    }  
    <span class="hljs-keyword">else</span>  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Redis connected!"</span>);  
    }  
}  

<span class="hljs-keyword">void</span> CRedisPubClient::disconnect_callback(  
    <span class="hljs-keyword">const</span> redisAsyncContext *redis_context, <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {    
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error: %s\n"</span>, redis_context->errstr);  
    }  
}  

<span class="hljs-comment">// 消息接收回调函数  </span>
<span class="hljs-keyword">void</span> CRedisPubClient::command_callback(redisAsyncContext *redis_context,  
    <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata)  
{  
    <span class="hljs-keyword">if</span> (NULL == reply || NULL == privdata) 
    {  
        <span class="hljs-keyword">return</span> ;  
    }  

    redisReply *redis_reply = <span class="hljs-keyword">reinterpret_cast</span><redisReply *>(reply); 

    <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Publish: %s"</span>,redis_reply.str);
}  

<span class="hljs-keyword">void</span> *CRedisPubClient::event_thread(<span class="hljs-keyword">void</span> *data)  
{  
    <span class="hljs-keyword">if</span> (NULL == data)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error!\n"</span>);  
        assert(<span class="hljs-keyword">false</span>);  
        <span class="hljs-keyword">return</span> NULL;  
    }  

    CRedisPubClient *self_this = <span class="hljs-keyword">reinterpret_cast</span><CRedisPubClient *>(data);  
    <span class="hljs-keyword">return</span> self_this->event_proc();  
}  

<span class="hljs-keyword">void</span> *CRedisPubClient::event_proc()  
{  
    sem_wait(&_event_sem);  

    <span class="hljs-comment">//进行事件处理循环  </span>
    aeMain(loop);  

    <span class="hljs-keyword">return</span> NULL;  
}  </code>
登入後複製

测试封装的sub_client和pub_client类:

<code class=" hljs cpp"><span class="hljs-comment">//test_subpub.cpp</span>

<span class="hljs-preprocessor">#include "pub_client.h"</span>
<span class="hljs-preprocessor">#include "sub_client.h"  </span>

<span class="hljs-keyword">int</span> main(<span class="hljs-keyword">int</span> argc, <span class="hljs-keyword">char</span> *argv[])  
{  
    CRedisPubClient publisher;  
    CRedisSubClient subcriber;

    <span class="hljs-keyword">bool</span> ret_pub = publisher.init();  
    <span class="hljs-keyword">bool</span> ret_sub = subcriber.init();

    <span class="hljs-keyword">if</span> (!ret_sub&&!ret_pub)   
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Init failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>;  
    }  

    ret_pub = publisher.connect(); 
    ret_sub = subcriber.connect(); 

    <span class="hljs-keyword">if</span> (!ret_sub&&!ret_pub)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"connect failed."</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>;  
    }  

    subscriber.subcribe(<span class="hljs-string">"sports"</span>);

    <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>)  
    {  
        publisher.publish(<span class="hljs-string">"sports"</span>, <span class="hljs-string">"ball"</span>);  
        sleep(<span class="hljs-number">1</span>);  
    }  

    publisher.disconnect();  
    publisher.uninit();  
    subscriber.disconnect();
    subscriber.disconnect();
    <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>;  

} </code>
登入後複製

终于到了编译链接运行的阶段,我在这个地方卡了快一天,期间各种编译、链接的错误。直接编译链接会出现找到不aeCreateFileEvent,aeDeleteFileEvent,aeMain等等错误。

  1. 先将redis/src文件夹中的ae.c,ae.h,ae_epoll.c,config.h,zmalloc.c,zmalloc.h拷贝至hiredis目录下
  2. 用gcc -c ae.c gcc -c zmalloc.c生成ae.o和zmalloc.o,利用这两个文件生成静态库 ar -r libar.a ae.o zmalloc.o
  3. 然后编译g++ -o test_subpub test_subpub.cpp pub_client.cpp sub_client.cpp -pthread -I ../ -I./ -I ../adapters ../libhiredis.a ../libae.a
  4. 运行./test_subpub

可以看到,客户端中消息在不断地滚动,即同时实现了订阅频道和发送消息的功能。

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

CUDA之通用矩陣乘法:從入門到熟練! CUDA之通用矩陣乘法:從入門到熟練! Mar 25, 2024 pm 12:30 PM

通用矩陣乘法(GeneralMatrixMultiplication,GEMM)是許多應用程式和演算法中至關重要的一部分,也是評估電腦硬體效能的重要指標之一。透過深入研究和優化GEMM的實現,可以幫助我們更好地理解高效能運算以及軟硬體系統之間的關係。在電腦科學中,對GEMM進行有效的最佳化可以提高運算速度並節省資源,這對於提高電腦系統的整體效能至關重要。深入了解GEMM的工作原理和最佳化方法,有助於我們更好地利用現代計算硬體的潛力,並為各種複雜計算任務提供更有效率的解決方案。透過對GEMM性能的優

华为乾崑 ADS3.0 智驾系统 8 月上市 享界 S9 首发搭载 华为乾崑 ADS3.0 智驾系统 8 月上市 享界 S9 首发搭载 Jul 30, 2024 pm 02:17 PM

7月29日,在AITO问界第四十万台新车下线仪式上,华为常务董事、终端BG董事长、智能汽车解决方案BU董事长余承东出席发表演讲并宣布,问界系列车型将于今年8月迎来华为乾崑ADS3.0版本的上市,并计划在8月至9月间陆续推送升级。8月6日即将发布的享界S9将首发华为ADS3.0智能驾驶系统。华为乾崑ADS3.0版本在激光雷达的辅助下,将大幅提升智驾能力,具备融合端到端的能力,并采用GOD(通用障碍物识别)/PDP(预测决策规控)全新端到端架构,提供车位到车位智驾领航NCA功能,并升级CAS3.0全

華為手機如何實現雙微信登入? 華為手機如何實現雙微信登入? Mar 24, 2024 am 11:27 AM

華為手機如何實現雙微信登入?隨著社群媒體的興起,微信已成為人們日常生活中不可或缺的溝通工具之一。然而,許多人可能會遇到一個問題:在同一部手機上同時登入多個微信帳號。對於華為手機用戶來說,實現雙微信登入並不困難,本文將介紹華為手機如何實現雙微信登入的方法。首先,華為手機自帶的EMUI系統提供了一個很方便的功能-應用程式雙開。透過應用程式雙開功能,用戶可以在手機上同

微信不顯示該聊天怎麼恢復?微信不顯示該聊天恢復方法 微信不顯示該聊天怎麼恢復?微信不顯示該聊天恢復方法 Mar 13, 2024 pm 01:50 PM

微信,作為當下最受歡迎的社交軟體之一,提供了豐富的聊天功能。但有時,我們可能會遇到「不顯示該聊天」的情況,導致某些重要對話被隱藏。要恢復這些聊天,其實很簡單。只要依照以下這些步驟操作,就能輕鬆恢復被隱藏的聊天,繼續享受微信帶來的便利交流體驗。微信不顯示該聊天怎麼恢復?微信不顯示此聊天恢復方法方法一,嘗試直接在微信的訊息清單中搜尋該聊天物件的名稱或關鍵字。如果搜尋到了,點擊進入聊天介面,這樣就可以恢復顯示該聊天了。方法二,透過好友聊天恢復:開啟微信,點選通訊錄,找到被隱藏聊天顯示的好友,點選發送訊息

PHP程式設計指南:實作斐波那契數列的方法 PHP程式設計指南:實作斐波那契數列的方法 Mar 20, 2024 pm 04:54 PM

程式語言PHP是一種用於Web開發的強大工具,能夠支援多種不同的程式設計邏輯和演算法。其中,實作斐波那契數列是一個常見且經典的程式設計問題。在這篇文章中,將介紹如何使用PHP程式語言來實作斐波那契數列的方法,並附上具體的程式碼範例。斐波那契數列是一個數學上的序列,其定義如下:數列的第一個和第二個元素為1,從第三個元素開始,每個元素的值等於前兩個元素的和。數列的前幾元

常用常新!華為Mate60系列升級HarmonyOS 4.2:AI雲端增強、小藝方言太好用了 常用常新!華為Mate60系列升級HarmonyOS 4.2:AI雲端增強、小藝方言太好用了 Jun 02, 2024 pm 02:58 PM

4月11日,華為官方首次宣布HarmonyOS4.2百機升級計劃,此次共有180餘款設備參與升級,品類覆蓋手機、平板、手錶、耳機、智慧螢幕等設備。過去一個月,隨著HarmonyOS4.2百機升級計畫的穩定推進,包括華為Pocket2、華為MateX5系列、nova12系列、華為Pura系列等多款熱門機型也已紛紛展開升級適配,這意味著會有更多華為機型用戶享受到HarmonyOS帶來的常用常新體驗。從使用者回饋來看,華為Mate60系列機種在升級HarmonyOS4.2之後,體驗全方位躍升。尤其是華為M

蘋果16系統哪個版本最好 蘋果16系統哪個版本最好 Mar 08, 2024 pm 05:16 PM

蘋果16系統中版本最好的是iOS16.1.4,iOS16系統的最佳版本可能因人而異添加和日常使用體驗的提升也受到了很多用戶的好評。蘋果16系統哪個版本最好答:iOS16.1.4iOS16系統的最佳版本可能因人而異。根據公開的消息,2022年推出的iOS16被認為是一個非常穩定且性能優越的版本,用戶對其整體體驗也相當滿意。此外,iOS16中新功能的新增和日常使用體驗的提升也受到了許多用戶的好評。特別是在更新後的電池續航力、訊號表現和發熱控制方面,使用者的回饋都比較正面。然而,考慮到iPhone14

如何在華為手機上實現微信分身功能 如何在華為手機上實現微信分身功能 Mar 24, 2024 pm 06:03 PM

如何在華為手機上實現微信分身功能隨著社群軟體的普及和人們對隱私安全的日益重視,微信分身功能逐漸成為人們關注的焦點。微信分身功能可以幫助使用者在同一台手機上同時登入多個微信帳號,方便管理和使用。在華為手機上實現微信分身功能並不困難,只需要按照以下步驟操作即可。第一步:確保手機系統版本和微信版本符合要求首先,確保你的華為手機系統版本已更新至最新版本,以及微信App

See all articles