Users can also achieve similar functions by subscribing to related topics, but the persistence support built into the enterprise version is more efficient and reliable, greatly reducing the work of developers. measure and improve system stability.
Data persistence is an important feature of EMQ X and is only supported in the enterprise version.Persistence design
One-to-one message storage
##Publish side publishes a message;Configuring EMQ X server
## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379 backend.redis.pool1.server = 127.0.0.1:6379
emqx_ctl plugins load emqx_backend_redis
Client online status storage
When the client goes online and offline, Update the online status, online and offline time, and node client list to the Redis database. Although EMQ Configuration itemsOpen the configuration file and configure Backend rules:
## 上线 backend.redis.hook.client.connected.1 = { "action": { "function": "on_client_connected" }, "pool": "pool1"} ## 下线 backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
http://127.0.0.1 :18083
EMQ Open theredis-cli command line window and execute the command
keys *. The results are as follows. Readers can see that two keys are stored in Redis:
127.0.0.1:6379> keys * 1) "mqtt:node:emqx@127.0.0.1" 2) "mqtt:client:sub_client"
插件以 mqtt:node:{node_name}
格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:
## redis key 为 mqtt:node:{node_name} HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836
字段说明:
## 节点下在线设备信息 127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1 1) "sub_client1" # clientid 2) "1542272836" # 上线时间时间戳 3) "sub_client" 4) "1542272836"
插件以 mqtt:client:{client_id}
格式的 key 记录客户端在线状态、上线时间,等效操作:
## redis key 为 mqtt:client:{client_id} HMSET mqtt:client:sub_client state 1 online_at 1542272854
字段说明:
## 客户端在线状态 127.0.0.1:6379> HGETALL mqtt:client:sub_client 1) "state" 2) "0" # 0 离线 1 在线 3) "online_at" 4) "1542272854" # 上线时间戳 5) "offline_at" 6) "undefined" # 离线时间戳
当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。
打开配置文件,配置 Backend 规则:
## hook: client.connected ## action/function: on_subscribe_lookup backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
当 sub_client
设备上线时,需要为其订阅 sub_client/upstream
与 sub_client/downlink
两个 QoS 1 的主题:
插件以 mqtt:sub:{client_id}
格式 key 在 Redis 中初始化代理订阅 Hash:
## redis key 为 mqtt:sub:{client_id} ## HSET key {topic} {qos} 127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1 (integer) 0 127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1 (integer) 0
EMQ X 管理控制台 WebSocket 页面,以 clientid sub_client
新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstream
与 sub_client/downlink
两个 QoS 1 的主题:
切换回管理控制台 WebSocket 页面,向 sub_client/downlink
主题发布消息,可在消息订阅列表收到发布的消息。
打开配置文件,配置 Backend 规则,支持使用 topic
参数进行消息过滤,此处使用 #
通配符存储任意主题消息:
## hook: message.publish ## action/function: on_message_publish backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid sub_client
建立连接,向主题 upstream_topic
发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。
EMQ X 将消息列表以 message id 持久化至 mqtt:msg:{topic}
Redis 集合中:
## 获取 upstream_topic 主题集合中所有 message id 127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1 1) "2VFsyhDm0cPIQvnY9osj" 2) "2VFszTClyjpVtLDLrn1u" 3) "2VFszozkwkYOcbEy8QN9" 4) "2VFszpEc7DfbEqC97I3g" 5) "2VFszpSzRviADmcOeuXd" 6) "2VFszpm3kvvLkJTcdmGU" 7) "2VFt0kuNrOktefX6m4nP" 127.0.0.1:6379>
每条消息详情将以 mqtt:msg:{message_id}
格式的 key 存储在 Redis Hash 中:
## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情 127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP 1) "id" 2) "2VFt0kuNrOktefX6m4nP" ## message id 3) "from" 4) "sub_client" ## client id 5) "qos" 6) "2" 7) "topic" 8) "up/upstream_topic" 9) "payload" 10) "{ "cmd": "reboot" }" 11) "ts" 12) "1542338754" ## pub 时间戳 13) "retain" 14) "false"
打开配置文件,配置 Backend 规则:
## hook: session.subscribed ## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub ## 一对一离线消息 backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"} ## 一对多离线消息 backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
MQTT 离线消息需满足以下条件:
以 clean_session = false 连接
订阅 QoS > 0
发布 QoS > 0
在 EMQ X 管理控制台中以如下配置建立连接,
打开配置文件,配置 Backend 规则:
## hook: message.publish ## action/function: on_client_connected、on_message_retain backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"} backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
EMQ X 将消息列表以 message id 持久化至 mqtt:retain:{topic}
Redis Hash 中:
## 获取 upstream_topic 主题集合中所有 message id 127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1 1) "2VFsyhDm0cPIQvnY9osj" 127.0.0.1:6379>
每条消息详情将以 mqtt:msg:{message_id}
格式的 key 存储在 Redis Hash 中:
## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情 127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP 1) "id" 2) "2VFt0kuNrOktefX6m4nP" ## message id 3) "from" 4) "sub_client" ## client id 5) "qos" 6) "2" 7) "topic" 8) "up/upstream_topic" 9) "payload" 10) "{ "cmd": "reboot" }" 11) "ts" 12) "1542338754" ## pub 时间戳 13) "retain" 14) "false"
The above is the detailed content of How to implement EMQ X Redis data persistence. For more information, please follow other related articles on the PHP Chinese website!