MongoDB のレプリケーションは、ログを通じて書き込み操作を保存します。このログは、oplog と呼ばれます。次の記事では、MongoDB の oplog メカニズムを使用して、それを必要とする人向けの関連情報を主に紹介します。参考に、以下を見てみましょう。
はじめに
最近では、MongoDBに新規に挿入されたデータをリアルタイムに取得する必要があり、挿入プログラム自体がすでに処理ロジックを一式持っているため、挿入プログラムに直接関連するプログラムを記述するのは不便です。従来のデータベースにはこの種の トリガー メカニズムが付属していますが、Mongo には使用できる関連機能がありません (もちろん、私があまりにも知識が少ないかもしれません。修正してください)。 Python なので、対応する実装メソッドを集めてコンパイルしました。
1. はじめに
まず、この要件は、実際には、スレーブ データベースがマスター データベースを同期できる理由と非常によく似ています。それは、制御のための特定の指標があるためです。MongoDB には既製のトリガーはありませんが、マスター/スレーブ バックアップを実現できるため、そのマスター/スレーブ バックアップ メカニズムから始めます。
2. OPLOG
まず、コマンドラインで –master を使用するか、設定ファイルでマスターキーを true に追加する必要があります。
この時点で、Mongo のシステム ライブラリ ローカルに新しいコレクション - oplog が表示されます。この時点で、If Mongo がまだ存在する場合、oplog 情報は oplog.$main
に保存されます。ここではマスター/スレーブ同期ではないため、これらのセットは存在しません。 oplog.$main
里就会存储进oplog信息,如果此时还有充当从数据库的Mongo存在,就会还有一些slaves的信息,由于我们这里并不是主从同步,所以不存在这些集合。
再来看看oplog结构:
"ts" : Timestamp(6417682881216249, 1), 时间戳 "h" : NumberLong(0), 长度 "v" : 2, "op" : "n", 操作类型 "ns" : "", 操作的库和集合 "o2" : "_id" update条件 "o" : {} 操作值,即document
这里需要知道op的几种属性:
insert,'i' update, 'u' remove(delete), 'd' cmd, 'c' noop, 'n' 空操作
从上面的信息可以看出,我们只要不断读取到ts来做对比,然后根据op即可判断当前出现的是什么操作,相当于使用程序实现了一个从数据库的接收端。
三、CODE
在Github上找到了别人的实现方式,不过它的函数库太老旧,所以在他的基础上进行修改。
Github地址:github.com/RedBeard0531/mongo-oplog-watcher
mongo_oplog_watcher.py如下:
#!/usr/bin/python import pymongo import re import time from pprint import pprint # pretty printer from pymongo.errors import AutoReconnect class OplogWatcher(object): def init(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True): if collection is not None: if db is None: raise ValueError('must specify db if you specify a collection') self._ns_filter = db + '.' + collection elif db is not None: self._ns_filter = re.compile(r'^%s\.' % db) else: self._ns_filter = None self.poll_time = poll_time self.connection = connection or pymongo.Connection() if start_now: self.start() @staticmethod def get_id(op): id = None o2 = op.get('o2') if o2 is not None: id = o2.get('_id') if id is None: id = op['o'].get('_id') return id def start(self): oplog = self.connection.local['oplog.$main'] ts = oplog.find().sort('$natural', -1)[0]['ts'] while True: if self._ns_filter is None: filter = {} else: filter = {'ns': self._ns_filter} filter['ts'] = {'$gt': ts} try: cursor = oplog.find(filter, tailable=True) while True: for op in cursor: ts = op['ts'] id = self.get_id(op) self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op) time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time) def all_with_noop(self, ns, ts, op, id, raw): if op == 'n': self.noop(ts=ts) else: self.all(ns=ns, ts=ts, op=op, id=id, raw=raw) def all(self, ns, ts, op, id, raw): if op == 'i': self.insert(ns=ns, ts=ts, id=id, obj=raw['o'], raw=raw) elif op == 'u': self.update(ns=ns, ts=ts, id=id, mod=raw['o'], raw=raw) elif op == 'd': self.delete(ns=ns, ts=ts, id=id, raw=raw) elif op == 'c': self.command(ns=ns, ts=ts, cmd=raw['o'], raw=raw) elif op == 'db': self.db_declare(ns=ns, ts=ts, raw=raw) def noop(self, ts): pass def insert(self, ns, ts, id, obj, raw, **kw): pass def update(self, ns, ts, id, mod, raw, **kw): pass def delete(self, ns, ts, id, raw, **kw): pass def command(self, ns, ts, cmd, raw, **kw): pass def db_declare(self, ns, ts, **kw): pass class OplogPrinter(OplogWatcher): def all(self, **kw): pprint (kw) print #newline if name == 'main': OplogPrinter()
首先是实现一个数据库的初始化,设定一个延迟时间(准实时):
self.poll_time = poll_time self.connection = connection or pymongo.MongoClient()
主要的函数是start()
oplog 構造を見てみましょう: