ホームページ > データベース > mysql チュートリアル > MongoDB の oplog メカニズムを使用したデータ監視の例を共有する

MongoDB の oplog メカニズムを使用したデータ監視の例を共有する

零下一度
リリース: 2017-07-03 16:43:59
オリジナル
2077 人が閲覧しました

MongoDB のレプリケーションは、ログを通じて書き込み操作を保存します。このログは、oplog と呼ばれます。次の記事では、MongoDB の oplog メカニズムを使用して、それを必要とする人向けの関連情報を主に紹介します。参考に、以下を見てみましょう。

はじめに

最近では、MongoDBに新規に挿入されたデータをリアルタイムに取得する必要があり、挿入プログラム自体がすでに処理ロジックを一式持っているため、挿入プログラムに直接関連するプログラムを記述するのは不便です。従来のデータベースにはこの種の トリガー メカニズムが付属していますが、Mongo には使用できる関連機能がありません (もちろん、私があまりにも知識が少ないかもしれません。修正してください)。 Python なので、対応する実装メソッドを集めてコンパイルしました。

1. はじめに

まず、この要件は、実際には、スレーブ データベースがマスター データベースを同期できる理由と非常によく似ています。それは、制御のための特定の指標があるためです。MongoDB には既製のトリガーはありませんが、マスター/スレーブ バックアップを実現できるため、そのマスター/スレーブ バックアップ メカニズムから始めます。

2. OPLOG

まず、コマンドラインで –master を使用するか、設定ファイルでマスターキーを true に追加する必要があります。

この時点で、Mongo のシステム ライブラリ ローカルに新しいコレクション - oplog が表示されます。この時点で、If Mongo がまだ存在する場合、oplog 情報は oplog.$main に保存されます。ここではマスター/スレーブ同期ではないため、これらのセットは存在しません。 oplog.$main里就会存储进oplog信息,如果此时还有充当从数据库的Mongo存在,就会还有一些slaves的信息,由于我们这里并不是主从同步,所以不存在这些集合。

再来看看oplog结构:


"ts" : Timestamp(6417682881216249, 1), 时间戳
"h" : NumberLong(0), 长度
"v" : 2, 
"op" : "n", 操作类型
"ns" : "", 操作的库和集合
"o2" : "_id" update条件
"o" : {} 操作值,即document
ログイン後にコピー

这里需要知道op的几种属性:


insert,'i'
update, 'u'
remove(delete), 'd'
cmd, 'c'
noop, 'n' 空操作
ログイン後にコピー

从上面的信息可以看出,我们只要不断读取到ts来做对比,然后根据op即可判断当前出现的是什么操作,相当于使用程序实现了一个从数据库的接收端。

三、CODE

在Github上找到了别人的实现方式,不过它的函数库太老旧,所以在他的基础上进行修改。

Github地址:github.com/RedBeard0531/mongo-oplog-watcher

mongo_oplog_watcher.py如下:

#!/usr/bin/python
import pymongo
import re
import time
from pprint import pprint # pretty printer
from pymongo.errors import AutoReconnect

class OplogWatcher(object):
  def init(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True):
    if collection is not None:
      if db is None:
        raise ValueError('must specify db if you specify a collection')
      self._ns_filter = db + '.' + collection
    elif db is not None:
      self._ns_filter = re.compile(r'^%s\.' % db)
    else:
      self._ns_filter = None

    self.poll_time = poll_time
    self.connection = connection or pymongo.Connection()

    if start_now:
      self.start()

  @staticmethod
  def get_id(op):
    id = None
    o2 = op.get('o2')
    if o2 is not None:
      id = o2.get('_id')

    if id is None:
      id = op['o'].get('_id')

    return id

  def start(self):
    oplog = self.connection.local['oplog.$main']
    ts = oplog.find().sort('$natural', -1)[0]['ts']
    while True:
      if self._ns_filter is None: 
        filter = {}
      else:
        filter = {'ns': self._ns_filter}
      filter['ts'] = {'$gt': ts}
      try:
        cursor = oplog.find(filter, tailable=True)
        while True:
          for op in cursor:
            ts = op['ts']
            id = self.get_id(op)
            self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op)
          time.sleep(self.poll_time)
          if not cursor.alive:
            break
      except AutoReconnect:
        time.sleep(self.poll_time)

  def all_with_noop(self, ns, ts, op, id, raw):
    if op == 'n':
      self.noop(ts=ts)
    else:
      self.all(ns=ns, ts=ts, op=op, id=id, raw=raw)

  def all(self, ns, ts, op, id, raw):
    if op == 'i':
      self.insert(ns=ns, ts=ts, id=id, obj=raw['o'], raw=raw)
    elif op == 'u':
      self.update(ns=ns, ts=ts, id=id, mod=raw['o'], raw=raw)
    elif op == 'd':
      self.delete(ns=ns, ts=ts, id=id, raw=raw)
    elif op == 'c':
      self.command(ns=ns, ts=ts, cmd=raw['o'], raw=raw)
    elif op == 'db':
      self.db_declare(ns=ns, ts=ts, raw=raw)

  def noop(self, ts):
    pass

  def insert(self, ns, ts, id, obj, raw, **kw):
    pass

  def update(self, ns, ts, id, mod, raw, **kw):
    pass

  def delete(self, ns, ts, id, raw, **kw):
    pass

  def command(self, ns, ts, cmd, raw, **kw):
    pass

  def db_declare(self, ns, ts, **kw):
    pass

class OplogPrinter(OplogWatcher):
  def all(self, **kw):
    pprint (kw)
    print #newline

if name == 'main':
  OplogPrinter()
ログイン後にコピー

首先是实现一个数据库的初始化,设定一个延迟时间(准实时):


self.poll_time = poll_time
self.connection = connection or pymongo.MongoClient()
ログイン後にコピー

主要的函数是start()


oplog 構造を見てみましょう:

ここで、op のいくつかのプロパティを知る必要があります: 🎜🎜🎜🎜🎜rrreee🎜 上記の情報からわかるように、必要なのはts の読み取りを継続する 実行する 比較し、op を使用して、現在どのような操作が行われているかを判断します。これは、プログラムを使用してデータベースからの受信側を実装するのと同じです。 🎜🎜🎜🎜3. CODE🎜🎜🎜🎜🎜 Githubで他の人の実装を見つけましたが、関数ライブラリが古すぎるので、それをベースに修正しました。 🎜🎜Github アドレス: github.com/RedBeard0531/mongo-oplog-watcher🎜🎜🎜mongo_oplog_watcher.py は次のとおりです: 🎜🎜rrreee🎜 1 つ目は、データベースを初期化し、遅延時間 (準リアルタイム) を設定することです: 🎜 🎜🎜🎜 rrreee🎜 メイン関数は start() で、時間比較を実装し、対応するフィールドを処理します: 🎜🎜🎜🎜rrreee🎜 この start 関数をループし、対応する監視をここに記述できます。 all_with_noop 処理ロジック内。 🎜🎜このようにして、簡単な準リアルタイム Mongo🎜データベース操作🎜モニターを実装できます。次のステップでは、新しく入力されたプログラムを他の操作と組み合わせて処理できます。 🎜

以上がMongoDB の oplog メカニズムを使用したデータ監視の例を共有するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ソース:php.cn
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
最新の問題
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート