EMQ X Redis データ永続性を実装する方法
EMQ 、PostgreSQL、MongoDB、Cassandra、AWS DynamoDB、その他のさまざまなデータベースを使用して、外部サービスによる迅速なクエリを実行したり、サービスがダウンしている場合やクライアントが異常にオフラインになったときに現在の実行状態を保持したり、以前の状態が復元されるときに使用します。接続が復元され、永続性はクライアントでも使用可能 エージェント サブスクリプション: デバイス クライアントがオンラインになると、永続性モジュールはデータベースから事前設定されたトピックを直接ロードし、エージェント サブスクリプションを完了するため、システム設計の複雑さと通信オーバーヘッドが軽減されます。クライアントのサブスクリプションの。
ユーザーは、関連トピックを購読することで同様の機能を実現することもできますが、エンタープライズ バージョンに組み込まれている永続化サポートの方が効率的で信頼性が高く、開発者の作業が大幅に軽減され、システムの安定性を測定および改善できます。
データの永続性は EMQ X の重要な機能であり、エンタープライズ バージョンでのみサポートされています。永続化設計
永続化の原則は、設定されたイベント フックがトリガーされたときに処理関数 (アクション) を呼び出すことです。処理関数は対応するデータを取得した後、それに応じて処理します。設定された指示に従って、データの追加、削除、変更、確認を実行します。異なるデータベースの同じイベント フックで使用できるパラメータは同じですが、データベースの特性が異なるため、処理機能 (アクション) が異なります。永続化作業モデルとプロセス全体は次のとおりです。
1 対 1 のメッセージ ストレージ
- バックエンドはメッセージをデータベースに記録します;
- Subscribe はトピックをサブスクライブします;
- バックエンドがデータベースからメッセージを取得します。このトピックのメッセージ;
- メッセージをサブスクライバーに送信します。
- バックエンドは、メッセージをデータベースから削除します。サブスクライバーがそれを確認します。
- 1 対多のメッセージ ストレージ
- ##バックエンドはメッセージをデータベースに記録します;
- SUB1 と SUB2 はトピックをサブスクライブします;
-
- メッセージを SUB1 と SUB2 に送信します;
- バックエンドはトピックの位置を記録しますメッセージは SUB1 と SUB2 によって読み取られ、次回の起動時にこの位置からメッセージを取得します。
- Redis データの永続性
- Redis はデータの永続性をサポートしており、メモリ内のデータをディスクに保存し、再起動時に再ロードして使用できます。
- Redis は、単純なキーと値の型のデータをサポートするだけでなく、リスト、セット、zset、ハッシュなどのデータ構造のストレージも提供します。
- Redis はデータ バックアップ、つまりマスター/スレーブ モードでのデータ バックアップをサポートしています。
- 読者は、公式 Redis クイック スタートを参照して Redis をインストールし (この記事を書いている時点では、Redis バージョンは 5.0)、 redis-server を通じて Redis サーバーを起動できます。 ### 指示 。
EMQ X サーバーの構成EMQ Redis 永続化機能の場合、ほとんどの構成を変更する必要はありません。変更する必要がある唯一の点は、Redis サーバーのアドレスです。リーダーによってインストールされた Redis が EMQ X と同じサーバー上にない場合は、Redis サーバーの正しいアドレスとポートを指定してください。以下に示すように:
## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379 backend.redis.pool1.server = 127.0.0.1:6379
残りの構成ファイルを変更しないで、プラグインを開始します:
emqx_ctl plugins load emqx_backend_redis
クライアントのオンライン ステータスのストレージクライアントがオンラインになったときとオフラインになったとき, オンライン ステータス、オンライン時間とオフライン時間、ノード クライアント リストを Redis データベースに更新します。
EMQ とはいえ
設定項目
設定ファイルを開いてバックエンドルールを設定します:
## 上线 backend.redis.hook.client.connected.1 = { "action": { "function": "on_client_connected" }, "pool": "pool1"} ## 下线 backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
使用例
ブラウザ
http://127.0を開きます。 0.1 :18083EMQ
redis-cli コマンド ライン ウィンドウを開き、コマンド 插件以 字段说明: 插件以 字段说明: 当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。 打开配置文件,配置 Backend 规则: 当 插件以 EMQ X 管理控制台 WebSocket 页面,以 clientid 切换回管理控制台 WebSocket 页面,向 打开配置文件,配置 Backend 规则,支持使用 在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid EMQ X 将消息列表以 message id 持久化至 每条消息详情将以 打开配置文件,配置 Backend 规则: MQTT 离线消息需满足以下条件: 以 clean_session = false 连接 订阅 QoS > 0 发布 QoS > 0 在 EMQ X 管理控制台中以如下配置建立连接, 打开配置文件,配置 Backend 规则: EMQ X 将消息列表以 message id 持久化至 每条消息详情将以 keys *
を実行します。結果は次のとおりです。 2 つのキーが Redis に保存されます: 127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"
连接列表
mqtt:node:{node_name}
格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:## redis key 为 mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836
## 节点下在线设备信息
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # 上线时间时间戳
3) "sub_client"
4) "1542272836"
连接详细信息
mqtt:client:{client_id}
格式的 key 记录客户端在线状态、上线时间,等效操作:## redis key 为 mqtt:client:{client_id}
HMSET mqtt:client:sub_client state 1 online_at 1542272854
## 客户端在线状态
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 离线 1 在线
3) "online_at"
4) "1542272854" # 上线时间戳
5) "offline_at"
6) "undefined" # 离线时间戳
客户端代理订阅
配置项
## hook: client.connected
## action/function: on_subscribe_lookup
backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
使用示例
sub_client
设备上线时,需要为其订阅 sub_client/upstream
与 sub_client/downlink
两个 QoS 1 的主题:mqtt:sub:{client_id}
格式 key 在 Redis 中初始化代理订阅 Hash:## redis key 为 mqtt:sub:{client_id}
## HSET key {topic} {qos}
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
(integer) 0
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
(integer) 0
sub_client
新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstream
与 sub_client/downlink
两个 QoS 1 的主题:sub_client/downlink
主题发布消息,可在消息订阅列表收到发布的消息。持久化发布消息
配置项
topic
参数进行消息过滤,此处使用 #
通配符存储任意主题消息:## hook: message.publish
## action/function: on_message_publish
backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
使用示例
sub_client
建立连接,向主题 upstream_topic
发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。消息列表
mqtt:msg:{topic}
Redis 集合中:## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
2) "2VFszTClyjpVtLDLrn1u"
3) "2VFszozkwkYOcbEy8QN9"
4) "2VFszpEc7DfbEqC97I3g"
5) "2VFszpSzRviADmcOeuXd"
6) "2VFszpm3kvvLkJTcdmGU"
7) "2VFt0kuNrOktefX6m4nP"
127.0.0.1:6379>
消息详情
mqtt:msg:{message_id}
格式的 key 存储在 Redis Hash 中:## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
1) "id"
2) "2VFt0kuNrOktefX6m4nP" ## message id
3) "from"
4) "sub_client" ## client id
5) "qos"
6) "2"
7) "topic"
8) "up/upstream_topic"
9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"
获取离线消息
配置项
## hook: session.subscribed
## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub
## 一对一离线消息
backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
## 一对多离线消息
backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
使用示例
持久化 Retain 消息
配置项
## hook: message.publish
## action/function: on_client_connected、on_message_retain
backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
消息列表
mqtt:retain:{topic}
Redis Hash 中:## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
127.0.0.1:6379>
消息详情
mqtt:msg:{message_id}
格式的 key 存储在 Redis Hash 中:## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
1) "id"
2) "2VFt0kuNrOktefX6m4nP" ## message id
3) "from"
4) "sub_client" ## client id
5) "qos"
6) "2"
7) "topic"
8) "up/upstream_topic"
9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"
以上がEMQ X Redis データ永続性を実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









Redisクラスターモードは、シャードを介してRedisインスタンスを複数のサーバーに展開し、スケーラビリティと可用性を向上させます。構造の手順は次のとおりです。異なるポートで奇妙なRedisインスタンスを作成します。 3つのセンチネルインスタンスを作成し、Redisインスタンスを監視し、フェールオーバーを監視します。 Sentinel構成ファイルを構成し、Redisインスタンス情報とフェールオーバー設定の監視を追加します。 Redisインスタンス構成ファイルを構成し、クラスターモードを有効にし、クラスター情報ファイルパスを指定します。各Redisインスタンスの情報を含むnodes.confファイルを作成します。クラスターを起動し、CREATEコマンドを実行してクラスターを作成し、レプリカの数を指定します。クラスターにログインしてクラスター情報コマンドを実行して、クラスターステータスを確認します。作る

Redisはハッシュテーブルを使用してデータを保存し、文字列、リスト、ハッシュテーブル、コレクション、注文コレクションなどのデータ構造をサポートします。 Redisは、スナップショット(RDB)を介してデータを維持し、書き込み専用(AOF)メカニズムを追加します。 Redisは、マスタースレーブレプリケーションを使用して、データの可用性を向上させます。 Redisは、シングルスレッドイベントループを使用して接続とコマンドを処理して、データの原子性と一貫性を確保します。 Redisは、キーの有効期限を設定し、怠zyな削除メカニズムを使用して有効期限キーを削除します。

Redisのすべてのキーを表示するには、3つの方法があります。キーコマンドを使用して、指定されたパターンに一致するすべてのキーを返します。スキャンコマンドを使用してキーを繰り返し、キーのセットを返します。情報コマンドを使用して、キーの総数を取得します。

Redisバージョン番号を表示するには、次の3つの方法を使用できます。(1)情報コマンドを入力し、(2) - versionオプションでサーバーを起動し、(3)構成ファイルを表示します。

Redis-Serverが見つからない問題を解決するための手順:インストールを確認して、Redisが正しくインストールされていることを確認します。環境変数Redis_hostとredis_portを設定します。 Redis Server Redis-Serverを起動します。サーバーがRedis-Cli pingを実行しているかどうかを確認します。

Redis Orderedセット(ZSET)は、並べ替えられた要素を保存し、関連するスコアでソートするために使用されます。 zsetを使用する手順には次のものがあります。1。zsetを作成します。 2。メンバーを追加します。 3.メンバースコアを取得します。 4。ランキングを取得します。 5.ランキング範囲のメンバーを取得します。 6.メンバーを削除します。 7.要素の数を取得します。 8。スコア範囲のメンバーの数を取得します。

Redisソースコードを理解する最良の方法は、段階的に進むことです。Redisの基本に精通してください。開始点として特定のモジュールまたは機能を選択します。モジュールまたは機能のエントリポイントから始めて、行ごとにコードを表示します。関数コールチェーンを介してコードを表示します。 Redisが使用する基礎となるデータ構造に精通してください。 Redisが使用するアルゴリズムを特定します。

Redis指令を使用するには、次の手順が必要です。Redisクライアントを開きます。コマンド(動詞キー値)を入力します。必要なパラメーターを提供します(指示ごとに異なります)。 Enterを押してコマンドを実行します。 Redisは、操作の結果を示す応答を返します(通常はOKまたは-ERR)。
