EMQ X Redis データ永続性を実装する方法

WBOY
リリース: 2023-06-02 11:43:37
転載
1801 人が閲覧しました

EMQ 、PostgreSQL、MongoDB、Cassandra、AWS DynamoDB、その他のさまざまなデータベースを使用して、外部サービスによる迅速なクエリを実行したり、サービスがダウンしている場合やクライアントが異常にオフラインになったときに現在の実行状態を保持したり、以前の状態が復元されるときに使用します。接続が復元され、永続性はクライアントでも使用可能 エージェント サブスクリプション: デバイス クライアントがオンラインになると、永続性モジュールはデータベースから事前設定されたトピックを直接ロードし、エージェント サブスクリプションを完了するため、システム設計の複雑さと通信オーバーヘッドが軽減されます。クライアントのサブスクリプションの。

ユーザーは、関連トピックを購読することで同様の機能を実現することもできますが、エンタープライズ バージョンに組み込まれている永続化サポートの方が効率的で信頼性が高く、開発者の作業が大幅に軽減され、システムの安定性を測定および改善できます。

データの永続性は EMQ X の重要な機能であり、エンタープライズ バージョンでのみサポートされています。

永続化設計

永続化の原則は、設定されたイベント フックがトリガーされたときに処理関数 (アクション) を呼び出すことです。処理関数は対応するデータを取得した後、それに応じて処理します。設定された指示に従って、データの追加、削除、変更、確認を実行します。異なるデータベースの同じイベント フックで使用できるパラメータは同じですが、データベースの特性が異なるため、処理機能 (アクション) が異なります。永続化作業モデルとプロセス全体は次のとおりです。

1 対 1 のメッセージ ストレージ

EMQ X Redis数据持久化怎么实现

##発行側がメッセージを発行します。
  1. バックエンドはメッセージをデータベースに記録します;
  2. Subscribe はトピックをサブスクライブします;
  3. バックエンドがデータベースからメッセージを取得します。このトピックのメッセージ;
  4. メッセージをサブスクライバーに送信します。
  5. バックエンドは、メッセージをデータベースから削除します。サブスクライバーがそれを確認します。
  6. 1 対多のメッセージ ストレージ

EMQ X Redis数据持久化怎么实现

PUB 側がメッセージを公開します。 ;
  1. ##バックエンドはメッセージをデータベースに記録します;
  2. SUB1 と SUB2 はトピックをサブスクライブします;
  3. バックエンドはデータベース メッセージからトピックを取得します;
  4. メッセージを SUB1 と SUB2 に送信します;
  5. バックエンドはトピックの位置を記録しますメッセージは SUB1 と SUB2 によって読み取られ、次回の起動時にこの位置からメッセージを取得します。
  6. Redis データの永続性
この記事では、実用的な例を使用して、Redis を通じて関連情報を保存する方法を説明します。

Redis は、BSD プロトコルに準拠した、完全にオープン ソースの無料の高性能キー/値データベースです。

他のキーバリュー キャッシュ製品と比較すると、Redis には次のような特徴があります。

Redis は非常に高いパフォーマンスを持ち、単一マシンで 2000 秒の読み取りおよび書き込み速度をサポートします。 100,000 レベル。
  • Redis はデータの永続性をサポートしており、メモリ内のデータをディスクに保存し、再起動時に再ロードして使用できます。
  • Redis は、単純なキーと値の型のデータをサポートするだけでなく、リスト、セット、zset、ハッシュなどのデータ構造のストレージも提供します。
  • Redis はデータ バックアップ、つまりマスター/スレーブ モードでのデータ バックアップをサポートしています。
  • 読者は、公式 Redis クイック スタートを参照して Redis をインストールし (この記事を書いている時点では、Redis バージョンは 5.0)、
  • redis-server を通じて Redis サーバーを起動できます。 ### 指示 。

EMQ X サーバーの構成EMQ Redis 永続化機能の場合、ほとんどの構成を変更する必要はありません。変更する必要がある唯一の点は、Redis サーバーのアドレスです。リーダーによってインストールされた Redis が EMQ X と同じサーバー上にない場合は、Redis サーバーの正しいアドレスとポートを指定してください。以下に示すように:

## Redis Server 127.0.0.1:6379, Redis Sentinel: 127.0.0.1:26379
backend.redis.pool1.server = 127.0.0.1:6379
ログイン後にコピー

残りの構成ファイルを変更しないで、プラグインを開始します:

emqx_ctl plugins load emqx_backend_redis
ログイン後にコピー

クライアントのオンライン ステータスのストレージクライアントがオンラインになったときとオフラインになったとき, オンライン ステータス、オンライン時間とオフライン時間、ノード クライアント リストを Redis データベースに更新します。

EMQ とはいえ

設定項目

設定ファイルを開いてバックエンドルールを設定します:

## 上线
backend.redis.hook.client.connected.1    =  { "action": { "function": "on_client_connected" }, "pool": "pool1"}

## 下线
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
ログイン後にコピー

使用例

ブラウザ

http://127.0を開きます。 0.1 :18083

EMQ

redis-cli

コマンド ライン ウィンドウを開き、コマンド keys * を実行します。結果は次のとおりです。 2 つのキーが Redis に保存されます:

127.0.0.1:6379> keys *
1) "mqtt:node:emqx@127.0.0.1"
2) "mqtt:client:sub_client"
ログイン後にコピー

连接列表

插件以 mqtt:node:{node_name} 格式的 key 记录节点下客户端列表及连接时间戳信息,等效操作:

## redis key 为 mqtt:node:{node_name}
HMSET mqtt:node:emqx@127.0.0.1 sub_client 1542272836
ログイン後にコピー

字段说明:

## 节点下在线设备信息
127.0.0.1:6379> HGETALL mqtt:node:emqx@127.0.0.1
1) "sub_client1" # clientid
2) "1542272836" # 上线时间时间戳
3) "sub_client"
4) "1542272836"
ログイン後にコピー

连接详细信息

插件以 mqtt:client:{client_id} 格式的 key 记录客户端在线状态、上线时间,等效操作:

## redis key 为 mqtt:client:{client_id}
HMSET mqtt:client:sub_client state 1 online_at 1542272854
ログイン後にコピー

字段说明:

## 客户端在线状态
127.0.0.1:6379> HGETALL mqtt:client:sub_client
1) "state"
2) "0" # 0 离线 1 在线
3) "online_at"
4) "1542272854" # 上线时间戳
5) "offline_at"
6) "undefined" # 离线时间戳
ログイン後にコピー

客户端代理订阅

当客户端上线时,代理会加载订阅主题,而存储模块会直接从数据库中读取预设待订阅列表。应用程序可以通过数据层的设定或更改来控制代理订阅列表,以便在需要预定主题进行通信和接收消息的情况下与客户端进行通信。

配置项

打开配置文件,配置 Backend 规则:

## hook: client.connected
## action/function: on_subscribe_lookup
backend.redis.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
ログイン後にコピー

使用示例

sub_client 设备上线时,需要为其订阅 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

  1. 插件以 mqtt:sub:{client_id} 格式 key 在 Redis 中初始化代理订阅 Hash:

## redis key 为 mqtt:sub:{client_id}
## HSET key {topic} {qos}
127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/upstream 1
(integer) 0

127.0.0.1:6379> HSET mqtt:sub:sub_client sub_client/downlink 1
(integer) 0
ログイン後にコピー
  1. EMQ X 管理控制台 WebSocket 页面,以 clientid sub_client 新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstreamsub_client/downlink 两个 QoS 1 的主题:

EMQ X Redis数据持久化怎么实现

  1. 切换回管理控制台 WebSocket 页面,向 sub_client/downlink 主题发布消息,可在消息订阅列表收到发布的消息。

持久化发布消息

配置项

打开配置文件,配置 Backend 规则,支持使用 topic 参数进行消息过滤,此处使用 # 通配符存储任意主题消息:

## hook: message.publish
## action/function: on_message_publish

backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
ログイン後にコピー

使用示例

在 EMQ X 管理控制台 WebSocket 页面中,使用 clientid sub_client 建立连接,向主题 upstream_topic 发布多条消息。针对每条消息, EMQ X 将持久化消息列表、消息详情两条记录。

消息列表

EMQ X 将消息列表以 message id 持久化至 mqtt:msg:{topic} Redis 集合中:

## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:msg:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
2) "2VFszTClyjpVtLDLrn1u"
3) "2VFszozkwkYOcbEy8QN9"
4) "2VFszpEc7DfbEqC97I3g"
5) "2VFszpSzRviADmcOeuXd"
6) "2VFszpm3kvvLkJTcdmGU"
7) "2VFt0kuNrOktefX6m4nP"
127.0.0.1:6379>
ログイン後にコピー

消息详情

每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"
ログイン後にコピー
ログイン後にコピー

获取离线消息

配置项

打开配置文件,配置 Backend 规则:

## hook: session.subscribed
## action/function: on_message_fetch_for_queue、on_message_fetch_for_pubsub

## 一对一离线消息
backend.redis.hook.session.subscribed.1  = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}

## 一对多离线消息
backend.redis.hook.session.subscribed.2  = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
ログイン後にコピー

使用示例

MQTT 离线消息需满足以下条件:

  1. 以 clean_session = false 连接

  2. 订阅 QoS > 0

  3. 发布 QoS > 0

在 EMQ X 管理控制台中以如下配置建立连接,

EMQ X Redis数据持久化怎么实现

持久化 Retain 消息

配置项

打开配置文件,配置 Backend 规则:

## hook: message.publish
## action/function: on_client_connected、on_message_retain

backend.redis.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

backend.redis.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
ログイン後にコピー

消息列表

EMQ X 将消息列表以 message id 持久化至 mqtt:retain:{topic} Redis Hash 中:

## 获取 upstream_topic 主题集合中所有 message id
127.0.0.1:6379> ZRANGE mqtt:retain:upstream_topic 0 -1
1) "2VFsyhDm0cPIQvnY9osj"
127.0.0.1:6379>
ログイン後にコピー

消息详情

每条消息详情将以 mqtt:msg:{message_id} 格式的 key 存储在 Redis Hash 中:

## 获取 message id 为 2VFt0kuNrOktefX6m4nP 的消息详情
127.0.0.1:6379> HGETALL mqtt:msg:2VFt0kuNrOktefX6m4nP
 1) "id"
 2) "2VFt0kuNrOktefX6m4nP" ## message id
 3) "from"
 4) "sub_client" ## client id
 5) "qos"
 6) "2"
 7) "topic"
 8) "up/upstream_topic"
 9) "payload"
10) "{ "cmd": "reboot" }"
11) "ts"
12) "1542338754" ## pub 时间戳
13) "retain"
14) "false"
ログイン後にコピー
ログイン後にコピー

以上がEMQ X Redis データ永続性を実装する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:yisu.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート