EMQ X Redis 데이터 지속성을 구현하는 방법
Jun 02, 2023 am 11:43 AMEMQ, AWS DynamoDB 및 기타 다양한 데이터베이스를 도입하여 외부 서비스에 의한 빠른 쿼리를 수행하거나 서비스가 다운되거나 클라이언트가 비정상적으로 오프라인일 때 현재 실행 상태를 유지하고 연결 지속성이 복원될 때 이전 상태로 복원할 수도 있습니다. 클라이언트 에이전트 구독, 장치 클라이언트에 사용됩니다. 온라인일 때 지속성 모듈은 데이터베이스에서 미리 설정된 주제를 직접 로드하고 프록시 구독을 완료하여 시스템 설계의 복잡성과 클라이언트 구독 통신 오버헤드를 줄입니다.
사용자는 관련 주제를 구독하여 비슷한 기능을 수행할 수도 있지만, 엔터프라이즈 버전에 내장된 지속성 지원이 더 효율적이고 안정적이어서 개발자의 작업량을 크게 줄이고 시스템 안정성을 향상시킵니다.
데이터 지속성은 EMQ X의 중요한 기능이며 엔터프라이즈 버전에서만 지원됩니다.지속성 설계
지속성의 원리는 구성된 이벤트 후크가 트리거될 때 처리 기능(액션)을 호출하는 것입니다. 처리 기능은 해당 데이터를 얻은 후 구성된 지침에 따라 처리하여 추가를 실현합니다. 데이터 삭제, 수정 및 쿼리. 서로 다른 데이터베이스에서 동일한 이벤트 후크에 사용할 수 있는 매개변수는 동일하지만 데이터베이스 특성이 다르기 때문에 처리 기능(작업)이 다릅니다. 전체 지속성 작업 모델 및 프로세스는 다음과 같습니다.
일대일 메시지 저장
- 게시 측에서 메시지를 게시합니다.
- 백엔드는 메시지를 데이터베이스에 기록합니다.
- 백엔드는 데이터베이스에서 주제의 메시지를 받습니다.
- 구독자가 확인한 후
- 백엔드는 메시지를 데이터베이스에서 제거합니다.
- 일대다 메시지 저장
PUB는 메시지를 게시합니다.
- SUB2는 주제를 구독합니다.
- 백엔드는 데이터베이스에서 주제의 메시지를 가져옵니다. SUB1 및 SUB2에 메시지 보내기
- 백엔드는 SUB1 및 SUB2의 읽은 메시지 위치를 기록하고 다음에 메시지를 받을 때 이 위치에서 시작합니다.
- Redis 데이터 지속성
- 이 문서에서는 실제 예제를 사용하여 Redis를 통해 관련 정보를 저장하는 방법을 설명합니다. Redis는 BSD 프로토콜을 준수하는 완전 오픈 소스, 무료, 고성능 키-값 데이터베이스입니다.
- 다른 키-값 캐시 제품과 비교하여 Redis는 다음과 같은 특징을 가지고 있습니다.
- Redis는 데이터 백업, 즉 마스터-슬레이브 모드에서의 데이터 백업을 지원합니다.
-
독자는 공식 Redis Quick Start를 참조하여 Redis를 설치하고(이 글을 작성할 당시 Redis 버전은 5.0임)
redis-server
명령을 통해 Redis 서버를 시작할 수 있습니다. - EMQ X 서버 구성EMQ의 경우 일부 구성은 변경할 필요가 없습니다. 변경해야 할 유일한 것은 Redis 서버의 주소일 수 있습니다. 리더가 설치한 Redis가 EMQ X와 동일한 서버에 없는 경우 Redis 서버의 올바른 주소와 포트를 지정하십시오. 아래와 같이:
## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379 backend.redis.pool1.server = 127.0.0.1:6379
emqx_ctl plugins load emqx_backend_redis
redis-server
命令来启动 Redis 服务器。
配置 EMQ X 服务器
通过 RPM 方式安装的 EMQ X,Redis 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_redis.conf
,如果只是测试 Redis 持久化的功能,大部分配置不需要做更改。唯一需要更改的地方可能是 Redis 服务器的地址:如果读者安装的 Redis 不与 EMQ X 在同一服务器上,请指定正确的 Redis 服务器的地址与端口。如下所示:
## 上线 backend.redis.hook.client.connected.1 = { "action": { "function": "on_client_connected" }, "pool": "pool1"} ## 下线 backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
保持剩下部分的配置文件不变,然后启动该插件:
127.0.0.1:6379> keys * 1) "mqtt:node:emqx@127.0.0.1" 2) "mqtt:client:sub_client"
客户端在线状态存储
客户端上下线时,更新在线状态、上下线时间、节点客户端列表至 Redis 数据库。
尽管 EMQ X 本身提供了设备在线状态 API,但在需要频繁获取客户端在线状态、上下线时间的场景下,直接从数据库获取该记录比调用 EMQ X API 更高效。
配置项
打开配置文件,配置 Backend 规则:
## redis key 为 mqtt:node:{node_name} HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836
使用示例
浏览器打开 http://127.0.0.1:18083
EMQ X 管理控制台,在 工具 -> Websocket 中新建一个客户端连接,指定 clientid 为 sub_client:
打开 redis-cli
命令行窗口,执行命令 keys *
EMQ이긴 하지만
## 节点下在线设备信息 127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1 1) "sub_client1" # clientid 2) "1542272836" # 上线时间时间戳 3) "sub_client" 4) "1542272836"
http://127.0.0.1:18083
EMQ X 관리 콘솔에서 브라우저를 엽니다. 도구 -> Websocket 새 클라이언트 연결을 만들고 클라이언트 ID를 sub_client로 지정합니다.🎜🎜
redis-cli
명령줄 창을 열고 keys * 결과는 다음과 같습니다. 독자는 Redis에 두 개의 키가 저장되어 있음을 알 수 있습니다. 🎜<div class="code" style="position:relative; padding:0px; margin:0px;"><div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false">127.0.0.1:6379&gt; keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"</pre><div class="contentsignin">로그인 후 복사</div></div><div class="contentsignin">로그인 후 복사</div></div>
<h4 id="连接列表">连接列表</h4>
<p>插件以 <code>mqtt:node:{node_name}
格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:
## redis key 为 mqtt:node:{node_name} HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836
字段说明:
## 节点下在线设备信息 127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1 1) "sub_client1" # clientid 2) "1542272836" # 上线时间时间戳 3) "sub_client" 4) "1542272836"
连接详细信息
插件以 mqtt:client:{client_id}
格式的 key 记录客户端在线状态、上线时间,等效操作:
## redis key 为 mqtt:client:{client_id} HMSET mqtt:client:sub_client state 1 online_at 1542272854
字段说明:
## 客户端在线状态 127.0.0.1:6379> HGETALL mqtt:client:sub_client 1) "state" 2) "0" # 0 离线 1 在线 3) "online_at" 4) "1542272854" # 上线时间戳 5) "offline_at" 6) "undefined" # 离线时间戳
客户端代理订阅
当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。
配置项
打开配置文件,配置 Backend 规则:
## hook: client.connected ## action/function: on_subscribe_lookup backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
使用示例
当 sub_client
设备上线时,需要为其订阅 sub_client/upstream
与 sub_client/downlink
两个 QoS 1 的主题:
插件以
mqtt:sub:{client_id}
格式 key 在 Redis 中初始化代理订阅 Hash:
## redis key 为 mqtt:sub:{client_id} ## HSET key {topic} {qos} 127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1 (integer) 0 127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1 (integer) 0
EMQ X 管理控制台 WebSocket 页面,以 clientid
sub_client
新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了sub_client/upstream
与sub_client/downlink
两个 QoS 1 的主题:
切换回管理控制台 WebSocket 页面,向
sub_client/downlink
主题发布消息,可在消息订阅列表收到发布的消息。
持久化发布消息
配置项
打开配置文件,配置 Backend 规则,支持使用 topic
参数进行消息过滤,此处使用 #
通配符存储任意主题消息:
## hook: message.publish ## action/function: on_message_publish backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
使用示例
在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid sub_client
建立连接,向主题 upstream_topic
发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。
消息列表
EMQ X 将消息列表以 message id 持久化至 mqtt:msg:{topic}
Redis 集合中:
## 获取 upstream_topic 主题集合中所有 message id 127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1 1) "2VFsyhDm0cPIQvnY9osj" 2) "2VFszTClyjpVtLDLrn1u" 3) "2VFszozkwkYOcbEy8QN9" 4) "2VFszpEc7DfbEqC97I3g" 5) "2VFszpSzRviADmcOeuXd" 6) "2VFszpm3kvvLkJTcdmGU" 7) "2VFt0kuNrOktefX6m4nP" 127.0.0.1:6379>
消息详情
每条消息详情将以 mqtt:msg:{message_id}
格式的 key 存储在 Redis Hash 中:
## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情 127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP 1) "id" 2) "2VFt0kuNrOktefX6m4nP" ## message id 3) "from" 4) "sub_client" ## client id 5) "qos" 6) "2" 7) "topic" 8) "up/upstream_topic" 9) "payload" 10) "{ "cmd": "reboot" }" 11) "ts" 12) "1542338754" ## pub 时间戳 13) "retain" 14) "false"
获取离线消息
配置项
打开配置文件,配置 Backend 规则:
## hook: session.subscribed ## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub ## 一对一离线消息 backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"} ## 一对多离线消息 backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
使用示例
MQTT 离线消息需满足以下条件:
以 clean_session = false 连接
订阅 QoS > 0
发布 QoS > 0
在 EMQ X 管理控制台中以如下配置建立连接,
持久化 Retain 消息
配置项
打开配置文件,配置 Backend 规则:
## hook: message.publish ## action/function: on_client_connected、on_message_retain backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"} backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
消息列表
EMQ X 将消息列表以 message id 持久化至 mqtt:retain:{topic}
Redis Hash 中:
## 获取 upstream_topic 主题集合中所有 message id 127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1 1) "2VFsyhDm0cPIQvnY9osj" 127.0.0.1:6379>
消息详情
每条消息详情将以 mqtt:msg:{message_id}
格式的 key 存储在 Redis Hash 中:
## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情 127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP 1) "id" 2) "2VFt0kuNrOktefX6m4nP" ## message id 3) "from" 4) "sub_client" ## client id 5) "qos" 6) "2" 7) "topic" 8) "up/upstream_topic" 9) "payload" 10) "{ "cmd": "reboot" }" 11) "ts" 12) "1542338754" ## pub 时间戳 13) "retain" 14) "false"
위 내용은 EMQ X Redis 데이터 지속성을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

인기 기사

인기 기사

뜨거운 기사 태그

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기
강력한 PHP 통합 개발 환경

드림위버 CS6
시각적 웹 개발 도구

SublimeText3 Mac 버전
신 수준의 코드 편집 소프트웨어(SublimeText3)

뜨거운 주제











Windows 11 10.0.22000.100 설치 시 발생하는 0x80242008 오류 해결 방법

erlang과 golang 중 어느 것이 더 나은 성능을 갖고 있나요?
