ホームページ データベース mysql チュートリアル MongoDB の oplog メカニズムを使用したデータ監視の例を共有する

MongoDB の oplog メカニズムを使用したデータ監視の例を共有する

Jul 03, 2017 pm 04:43 PM
mongodb 成し遂げる

MongoDB のレプリケーションは、ログを通じて書き込み操作を保存します。このログは、oplog と呼ばれます。次の記事では、MongoDB の oplog メカニズムを使用して、それを必要とする人向けの関連情報を主に紹介します。参考に、以下を見てみましょう。

はじめに

最近では、MongoDBに新規に挿入されたデータをリアルタイムに取得する必要があり、挿入プログラム自体がすでに処理ロジックを一式持っているため、挿入プログラムに直接関連するプログラムを記述するのは不便です。従来のデータベースにはこの種の トリガー メカニズムが付属していますが、Mongo には使用できる関連機能がありません (もちろん、私があまりにも知識が少ないかもしれません。修正してください)。 Python なので、対応する実装メソッドを集めてコンパイルしました。

1. はじめに

まず、この要件は、実際には、スレーブ データベースがマスター データベースを同期できる理由と非常によく似ています。それは、制御のための特定の指標があるためです。MongoDB には既製のトリガーはありませんが、マスター/スレーブ バックアップを実現できるため、そのマスター/スレーブ バックアップ メカニズムから始めます。

2. OPLOG

まず、コマンドラインで –master を使用するか、設定ファイルでマスターキーを true に追加する必要があります。

この時点で、Mongo のシステム ライブラリ ローカルに新しいコレクション - oplog が表示されます。この時点で、If Mongo がまだ存在する場合、oplog 情報は oplog.$main に保存されます。ここではマスター/スレーブ同期ではないため、これらのセットは存在しません。 oplog.$main里就会存储进oplog信息,如果此时还有充当从数据库的Mongo存在,就会还有一些slaves的信息,由于我们这里并不是主从同步,所以不存在这些集合。

再来看看oplog结构:


"ts" : Timestamp(6417682881216249, 1), 时间戳
"h" : NumberLong(0), 长度
"v" : 2, 
"op" : "n", 操作类型
"ns" : "", 操作的库和集合
"o2" : "_id" update条件
"o" : {} 操作值,即document
ログイン後にコピー

这里需要知道op的几种属性:


insert,'i'
update, 'u'
remove(delete), 'd'
cmd, 'c'
noop, 'n' 空操作
ログイン後にコピー

从上面的信息可以看出,我们只要不断读取到ts来做对比,然后根据op即可判断当前出现的是什么操作,相当于使用程序实现了一个从数据库的接收端。

三、CODE

在Github上找到了别人的实现方式,不过它的函数库太老旧,所以在他的基础上进行修改。

Github地址:github.com/RedBeard0531/mongo-oplog-watcher

mongo_oplog_watcher.py如下:

#!/usr/bin/python
import pymongo
import re
import time
from pprint import pprint # pretty printer
from pymongo.errors import AutoReconnect

class OplogWatcher(object):
  def init(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True):
    if collection is not None:
      if db is None:
        raise ValueError('must specify db if you specify a collection')
      self._ns_filter = db + '.' + collection
    elif db is not None:
      self._ns_filter = re.compile(r'^%s\.' % db)
    else:
      self._ns_filter = None

    self.poll_time = poll_time
    self.connection = connection or pymongo.Connection()

    if start_now:
      self.start()

  @staticmethod
  def get_id(op):
    id = None
    o2 = op.get('o2')
    if o2 is not None:
      id = o2.get('_id')

    if id is None:
      id = op['o'].get('_id')

    return id

  def start(self):
    oplog = self.connection.local['oplog.$main']
    ts = oplog.find().sort('$natural', -1)[0]['ts']
    while True:
      if self._ns_filter is None: 
        filter = {}
      else:
        filter = {'ns': self._ns_filter}
      filter['ts'] = {'$gt': ts}
      try:
        cursor = oplog.find(filter, tailable=True)
        while True:
          for op in cursor:
            ts = op['ts']
            id = self.get_id(op)
            self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op)
          time.sleep(self.poll_time)
          if not cursor.alive:
            break
      except AutoReconnect:
        time.sleep(self.poll_time)

  def all_with_noop(self, ns, ts, op, id, raw):
    if op == 'n':
      self.noop(ts=ts)
    else:
      self.all(ns=ns, ts=ts, op=op, id=id, raw=raw)

  def all(self, ns, ts, op, id, raw):
    if op == 'i':
      self.insert(ns=ns, ts=ts, id=id, obj=raw['o'], raw=raw)
    elif op == 'u':
      self.update(ns=ns, ts=ts, id=id, mod=raw['o'], raw=raw)
    elif op == 'd':
      self.delete(ns=ns, ts=ts, id=id, raw=raw)
    elif op == 'c':
      self.command(ns=ns, ts=ts, cmd=raw['o'], raw=raw)
    elif op == 'db':
      self.db_declare(ns=ns, ts=ts, raw=raw)

  def noop(self, ts):
    pass

  def insert(self, ns, ts, id, obj, raw, **kw):
    pass

  def update(self, ns, ts, id, mod, raw, **kw):
    pass

  def delete(self, ns, ts, id, raw, **kw):
    pass

  def command(self, ns, ts, cmd, raw, **kw):
    pass

  def db_declare(self, ns, ts, **kw):
    pass

class OplogPrinter(OplogWatcher):
  def all(self, **kw):
    pprint (kw)
    print #newline

if name == 'main':
  OplogPrinter()
ログイン後にコピー

首先是实现一个数据库的初始化,设定一个延迟时间(准实时):


self.poll_time = poll_time
self.connection = connection or pymongo.MongoClient()
ログイン後にコピー

主要的函数是start()


oplog 構造を見てみましょう:

ここで、op のいくつかのプロパティを知る必要があります: 🎜🎜🎜🎜🎜rrreee🎜 上記の情報からわかるように、必要なのはts の読み取りを継続する 実行する 比較し、op を使用して、現在どのような操作が行われているかを判断します。これは、プログラムを使用してデータベースからの受信側を実装するのと同じです。 🎜🎜🎜🎜3. CODE🎜🎜🎜🎜🎜 Githubで他の人の実装を見つけましたが、関数ライブラリが古すぎるので、それをベースに修正しました。 🎜🎜Github アドレス: github.com/RedBeard0531/mongo-oplog-watcher🎜🎜🎜mongo_oplog_watcher.py は次のとおりです: 🎜🎜rrreee🎜 1 つ目は、データベースを初期化し、遅延時間 (準リアルタイム) を設定することです: 🎜 🎜🎜🎜 rrreee🎜 メイン関数は start() で、時間比較を実装し、対応するフィールドを処理します: 🎜🎜🎜🎜rrreee🎜 この start 関数をループし、対応する監視をここに記述できます。 all_with_noop 処理ロジック内。 🎜🎜このようにして、簡単な準リアルタイム Mongo🎜データベース操作🎜モニターを実装できます。次のステップでは、新しく入力されたプログラムを他の操作と組み合わせて処理できます。 🎜

以上がMongoDB の oplog メカニズムを使用したデータ監視の例を共有するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

navicat の有効期限が切れた場合の対処方法 navicat の有効期限が切れた場合の対処方法 Apr 23, 2024 pm 12:12 PM

Navicat の有効期限の問題を解決するには、ライセンスを更新する、自動更新を無効にする、Navicat プレミアム エッセンシャルの無料バージョンを使用する、などがあります。

navicat を mongodb に接続する方法 navicat を mongodb に接続する方法 Apr 24, 2024 am 11:27 AM

Navicat を使用して MongoDB に接続するには、次の手順を実行する必要があります: Navicat をインストールする MongoDB 接続を作成します: a. 接続名、ホスト アドレス、およびポートを入力します b. 認証情報を入力します (必要な場合) SSL 証明書を追加します (必要な場合) 接続を確認します接続を保存する

net4.0の用途は何ですか net4.0の用途は何ですか May 10, 2024 am 01:09 AM

.NET 4.0 はさまざまなアプリケーションの作成に使用され、オブジェクト指向プログラミング、柔軟性、強力なアーキテクチャ、クラウド コンピューティングの統合、パフォーマンスの最適化、広範なライブラリ、セキュリティ、スケーラビリティ、データ アクセス、モバイルなどの豊富な機能をアプリケーション開発者に提供します。開発サポート。

サーバーレスアーキテクチャでのJava関数とデータベースの統合 サーバーレスアーキテクチャでのJava関数とデータベースの統合 Apr 28, 2024 am 08:57 AM

サーバーレス アーキテクチャでは、Java 関数をデータベースと統合して、データベース内のデータにアクセスして操作できます。主な手順には、Java 関数の作成、環境変数の構成、関数のデプロイ、および関数のテストが含まれます。これらの手順に従うことで、開発者はデータベースに保存されているデータにシームレスにアクセスする複雑なアプリケーションを構築できます。

DebianでMongoDB自動拡張を構成する方法 DebianでMongoDB自動拡張を構成する方法 Apr 02, 2025 am 07:36 AM

この記事では、自動拡張を実現するためにDebianシステムでMongodbを構成する方法を紹介します。主な手順には、Mongodbレプリカセットとディスクスペース監視のセットアップが含まれます。 1。MongoDBのインストール最初に、MongoDBがDebianシステムにインストールされていることを確認してください。次のコマンドを使用してインストールします。sudoaptupdatesudoaptinstinstall-yymongodb-org2。mongodbレプリカセットMongodbレプリカセットの構成により、自動容量拡張を達成するための基礎となる高可用性とデータ冗長性が保証されます。 Mongodbサービスを開始:Sudosystemctlstartmongodsudosys

DebianでMongodbの高可用性を確保する方法 DebianでMongodbの高可用性を確保する方法 Apr 02, 2025 am 07:21 AM

この記事では、Debianシステムで非常に利用可能なMongoDBデータベースを構築する方法について説明します。データのセキュリティとサービスが引き続き動作し続けるようにするための複数の方法を探ります。キー戦略:レプリカセット:レプリカセット:レプリカセットを使用して、データの冗長性と自動フェールオーバーを実現します。マスターノードが失敗すると、レプリカセットが自動的に新しいマスターノードを選択して、サービスの継続的な可用性を確保します。データのバックアップと回復:MongoDumpコマンドを定期的に使用してデータベースをバックアップし、データ損失のリスクに対処するために効果的な回復戦略を策定します。監視とアラーム:監視ツール(プロメテウス、グラファナなど)を展開して、MongoDBの実行ステータスをリアルタイムで監視し、

navicat は mongodb に接続できますか? navicat は mongodb に接続できますか? Apr 23, 2024 pm 05:15 PM

はい、Navicat は MongoDB データベースに接続できます。具体的な手順は次のとおりです。 Navicat を開き、新しい接続を作成します。データベースの種類として MongoDB を選択します。 MongoDB のホスト アドレス、ポート、データベース名を入力します。 MongoDB のユーザー名とパスワードを入力します (必要な場合)。 「接続」ボタンをクリックします。

Pi Coinのメジャーアップデート:Pi Bankが来ています! Pi Coinのメジャーアップデート:Pi Bankが来ています! Mar 03, 2025 pm 06:18 PM

Pinetworkは、革新的なモバイルバンキングプラットフォームであるPibankを立ち上げようとしています! Pinetworkは本日、Pibankと呼ばれるElmahrosa(Face)Pimisrbankのメジャーアップデートをリリースしました。これは、従来の銀行サービスと、フィアット通貨の原子交換と暗号通貨の原子交換を実現します(resuptocursisを使用するなど、聖職者のような聖職者など、 DC)。ピバンクの魅力は何ですか?見つけましょう!ピバンクの主な機能:銀行口座と暗号通貨資産のワンストップ管理。リアルタイムトランザクションをサポートし、生​​物種を採用します

See all articles