사용자는 관련 주제를 구독하여 비슷한 기능을 수행할 수도 있지만, 엔터프라이즈 버전에 내장된 지속성 지원이 더 효율적이고 안정적이어서 개발자의 작업량을 크게 줄이고 시스템 안정성을 향상시킵니다.
데이터 지속성은 EMQ X의 중요한 기능이며 엔터프라이즈 버전에서만 지원됩니다.지속성 설계
일대일 메시지 저장
redis-server
명령을 통해 Redis 서버를 시작할 수 있습니다. ## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379 backend.redis.pool1.server = 127.0.0.1:6379
emqx_ctl plugins load emqx_backend_redis
redis-server
命令来启动 Redis 服务器。
通过 RPM 方式安装的 EMQ X,Redis 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_redis.conf
,如果只是测试 Redis 持久化的功能,大部分配置不需要做更改。唯一需要更改的地方可能是 Redis 服务器的地址:如果读者安装的 Redis 不与 EMQ X 在同一服务器上,请指定正确的 Redis 服务器的地址与端口。如下所示:
## 上线 backend.redis.hook.client.connected.1 = { "action": { "function": "on_client_connected" }, "pool": "pool1"} ## 下线 backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
保持剩下部分的配置文件不变,然后启动该插件:
127.0.0.1:6379> keys * 1) "mqtt:node:emqx@127.0.0.1" 2) "mqtt:client:sub_client"
客户端上下线时,更新在线状态、上下线时间、节点客户端列表至 Redis 数据库。
尽管 EMQ X 本身提供了设备在线状态 API,但在需要频繁获取客户端在线状态、上下线时间的场景下,直接从数据库获取该记录比调用 EMQ X API 更高效。
打开配置文件,配置 Backend 规则:
## redis key 为 mqtt:node:{node_name} HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836
浏览器打开 http://127.0.0.1:18083
EMQ X 管理控制台,在 工具 -> Websocket 中新建一个客户端连接,指定 clientid 为 sub_client:
打开 redis-cli
命令行窗口,执行命令 keys *
EMQ이긴 하지만
## 节点下在线设备信息 127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1 1) "sub_client1" # clientid 2) "1542272836" # 上线时间时间戳 3) "sub_client" 4) "1542272836"
http://127.0.0.1:18083
EMQ X 관리 콘솔에서 브라우저를 엽니다. 도구 -> Websocket 새 클라이언트 연결을 만들고 클라이언트 ID를 sub_client로 지정합니다.🎜🎜🎜🎜redis-cli
명령줄 창을 열고 keys * 결과는 다음과 같습니다. 독자는 Redis에 두 개의 키가 저장되어 있음을 알 수 있습니다. 🎜<div class="code" style="position:relative; padding:0px; margin:0px;"><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"</pre><div class="contentsignin">로그인 후 복사</div></div><div class="contentsignin">로그인 후 복사</div></div>
<h4>连接列表</h4>
<p>插件以 <code>mqtt:node:{node_name}
格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:
## redis key 为 mqtt:node:{node_name} HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836
字段说明:
## 节点下在线设备信息 127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1 1) "sub_client1" # clientid 2) "1542272836" # 上线时间时间戳 3) "sub_client" 4) "1542272836"
插件以 mqtt:client:{client_id}
格式的 key 记录客户端在线状态、上线时间,等效操作:
## redis key 为 mqtt:client:{client_id} HMSET mqtt:client:sub_client state 1 online_at 1542272854
字段说明:
## 客户端在线状态 127.0.0.1:6379> HGETALL mqtt:client:sub_client 1) "state" 2) "0" # 0 离线 1 在线 3) "online_at" 4) "1542272854" # 上线时间戳 5) "offline_at" 6) "undefined" # 离线时间戳
当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。
打开配置文件,配置 Backend 规则:
## hook: client.connected ## action/function: on_subscribe_lookup backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
当 sub_client
设备上线时,需要为其订阅 sub_client/upstream
与 sub_client/downlink
两个 QoS 1 的主题:
插件以 mqtt:sub:{client_id}
格式 key 在 Redis 中初始化代理订阅 Hash:
## redis key 为 mqtt:sub:{client_id} ## HSET key {topic} {qos} 127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1 (integer) 0 127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1 (integer) 0
EMQ X 管理控制台 WebSocket 页面,以 clientid sub_client
新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstream
与 sub_client/downlink
两个 QoS 1 的主题:
切换回管理控制台 WebSocket 页面,向 sub_client/downlink
主题发布消息,可在消息订阅列表收到发布的消息。
打开配置文件,配置 Backend 规则,支持使用 topic
参数进行消息过滤,此处使用 #
通配符存储任意主题消息:
## hook: message.publish ## action/function: on_message_publish backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid sub_client
建立连接,向主题 upstream_topic
发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。
EMQ X 将消息列表以 message id 持久化至 mqtt:msg:{topic}
Redis 集合中:
## 获取 upstream_topic 主题集合中所有 message id 127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1 1) "2VFsyhDm0cPIQvnY9osj" 2) "2VFszTClyjpVtLDLrn1u" 3) "2VFszozkwkYOcbEy8QN9" 4) "2VFszpEc7DfbEqC97I3g" 5) "2VFszpSzRviADmcOeuXd" 6) "2VFszpm3kvvLkJTcdmGU" 7) "2VFt0kuNrOktefX6m4nP" 127.0.0.1:6379>
每条消息详情将以 mqtt:msg:{message_id}
格式的 key 存储在 Redis Hash 中:
## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情 127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP 1) "id" 2) "2VFt0kuNrOktefX6m4nP" ## message id 3) "from" 4) "sub_client" ## client id 5) "qos" 6) "2" 7) "topic" 8) "up/upstream_topic" 9) "payload" 10) "{ "cmd": "reboot" }" 11) "ts" 12) "1542338754" ## pub 时间戳 13) "retain" 14) "false"
打开配置文件,配置 Backend 规则:
## hook: session.subscribed ## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub ## 一对一离线消息 backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"} ## 一对多离线消息 backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
MQTT 离线消息需满足以下条件:
以 clean_session = false 连接
订阅 QoS > 0
发布 QoS > 0
在 EMQ X 管理控制台中以如下配置建立连接,
打开配置文件,配置 Backend 规则:
## hook: message.publish ## action/function: on_client_connected、on_message_retain backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"} backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
EMQ X 将消息列表以 message id 持久化至 mqtt:retain:{topic}
Redis Hash 中:
## 获取 upstream_topic 主题集合中所有 message id 127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1 1) "2VFsyhDm0cPIQvnY9osj" 127.0.0.1:6379>
每条消息详情将以 mqtt:msg:{message_id}
格式的 key 存储在 Redis Hash 中:
## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情 127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP 1) "id" 2) "2VFt0kuNrOktefX6m4nP" ## message id 3) "from" 4) "sub_client" ## client id 5) "qos" 6) "2" 7) "topic" 8) "up/upstream_topic" 9) "payload" 10) "{ "cmd": "reboot" }" 11) "ts" 12) "1542338754" ## pub 时间戳 13) "retain" 14) "false"
위 내용은 EMQ X Redis 데이터 지속성을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!