分享用MongoDB中oplog机制实现数据监控实例
MongoDB 的Replication是通过一个日志来存储写操作的,这个日志就叫做oplog,而下面这篇文章主要给大家介绍了利用MongoDB中oplog机制实现准实时数据的操作监控的相关资料,需要的朋友可以参考借鉴,下面来一起看看吧。
前言
最近有一个需求是要实时获取到新插入到MongoDB的数据,而插入程序本身已经有一套处理逻辑,所以不方便直接在插入程序里写相关程序,传统的数据库大多自带这种触发器机制,但是Mongo没有相关的函数可以用(也可能我了解的太少了,求纠正),当然还有一点是需要python实现,于是收集整理了一个相应的实现方法。
一、引子
首先可以想到,这种需求其实很像数据库的主从备份机制,从数据库之所以能够同步主库是因为存在某些指标来做控制,我们知道MongoDB虽然没有现成触发器,但是它能够实现主从备份,所以我们就从它的主从备份机制入手。
二、OPLOG
首先,需要以master模式来打开mongod守护,命令行使用–master,或者配置文件增加master键为true。
此时,我们可以在Mongo的系统库local里见到新增的collection——oplog,此时oplog.$main
里就会存储进oplog信息,如果此时还有充当从数据库的Mongo存在,就会还有一些slaves的信息,由于我们这里并不是主从同步,所以不存在这些集合。
再来看看oplog结构:
"ts" : Timestamp(6417682881216249, 1), 时间戳 "h" : NumberLong(0), 长度 "v" : 2, "op" : "n", 操作类型 "ns" : "", 操作的库和集合 "o2" : "_id" update条件 "o" : {} 操作值,即document
这里需要知道op的几种属性:
insert,'i' update, 'u' remove(delete), 'd' cmd, 'c' noop, 'n' 空操作
从上面的信息可以看出,我们只要不断读取到ts来做对比,然后根据op即可判断当前出现的是什么操作,相当于使用程序实现了一个从数据库的接收端。
三、CODE
在Github上找到了别人的实现方式,不过它的函数库太老旧,所以在他的基础上进行修改。
Github地址:github.com/RedBeard0531/mongo-oplog-watcher
mongo_oplog_watcher.py如下:
#!/usr/bin/python import pymongo import re import time from pprint import pprint # pretty printer from pymongo.errors import AutoReconnect class OplogWatcher(object): def init(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True): if collection is not None: if db is None: raise ValueError('must specify db if you specify a collection') self._ns_filter = db + '.' + collection elif db is not None: self._ns_filter = re.compile(r'^%s\.' % db) else: self._ns_filter = None self.poll_time = poll_time self.connection = connection or pymongo.Connection() if start_now: self.start() @staticmethod def get_id(op): id = None o2 = op.get('o2') if o2 is not None: id = o2.get('_id') if id is None: id = op['o'].get('_id') return id def start(self): oplog = self.connection.local['oplog.$main'] ts = oplog.find().sort('$natural', -1)[0]['ts'] while True: if self._ns_filter is None: filter = {} else: filter = {'ns': self._ns_filter} filter['ts'] = {'$gt': ts} try: cursor = oplog.find(filter, tailable=True) while True: for op in cursor: ts = op['ts'] id = self.get_id(op) self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op) time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time) def all_with_noop(self, ns, ts, op, id, raw): if op == 'n': self.noop(ts=ts) else: self.all(ns=ns, ts=ts, op=op, id=id, raw=raw) def all(self, ns, ts, op, id, raw): if op == 'i': self.insert(ns=ns, ts=ts, id=id, obj=raw['o'], raw=raw) elif op == 'u': self.update(ns=ns, ts=ts, id=id, mod=raw['o'], raw=raw) elif op == 'd': self.delete(ns=ns, ts=ts, id=id, raw=raw) elif op == 'c': self.command(ns=ns, ts=ts, cmd=raw['o'], raw=raw) elif op == 'db': self.db_declare(ns=ns, ts=ts, raw=raw) def noop(self, ts): pass def insert(self, ns, ts, id, obj, raw, **kw): pass def update(self, ns, ts, id, mod, raw, **kw): pass def delete(self, ns, ts, id, raw, **kw): pass def command(self, ns, ts, cmd, raw, **kw): pass def db_declare(self, ns, ts, **kw): pass class OplogPrinter(OplogWatcher): def all(self, **kw): pprint (kw) print #newline if name == 'main': OplogPrinter()
首先是实现一个数据库的初始化,设定一个延迟时间(准实时):
self.poll_time = poll_time self.connection = connection or pymongo.MongoClient()
主要的函数是start()
,实现一个时间的比对并进行相应字段的处理:
def start(self): oplog = self.connection.local['oplog.$main'] #读取之前提到的库 ts = oplog.find().sort('$natural', -1)[0]['ts'] #获取一个时间边际 while True: if self._ns_filter is None: filter = {} else: filter = {'ns': self._ns_filter} filter['ts'] = {'$gt': ts} try: cursor = oplog.find(filter) #对此时间之后的进行处理 while True: for op in cursor: ts = op['ts'] id = self.get_id(op) self.all_with_noop(ns=op['ns'], ts=ts, op=op['op'], id=id, raw=op) #可以指定处理插入监控,更新监控或者删除监控等 time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time)
循环这个start函数,在all_with_noop这里就可以编写相应的监控处理逻辑。
这样就可以实现一个简易的准实时Mongo数据库操作监控器,下一步就可以配合其他操作来对新入库的程序进行相应处理。
以上是分享用MongoDB中oplog机制实现数据监控实例的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

解决 Navicat 过期问题的方法包括:续订许可证;卸载并重新安装;禁用自动更新;使用 Navicat Premium Essentials 免费版;联系 Navicat 客户支持。

要使用 Navicat 连接 MongoDB,您需要:安装 Navicat创建 MongoDB 连接:a. 输入连接名称、主机地址和端口b. 输入认证信息(如果需要)添加 SSL 证书(如果需要)验证连接保存连接

.NET 4.0 用于创建各种应用程序,它为应用程序开发人员提供了丰富的功能,包括:面向对象编程、灵活性、强大的架构、云计算集成、性能优化、广泛的库、安全性、可扩展性、数据访问和移动开发支持。

在无服务器架构中,Java函数可以与数据库集成,以访问和操作数据库中的数据。关键步骤包括:创建Java函数、配置环境变量、部署函数和测试函数。通过遵循这些步骤,开发人员可以构建复杂的应用程序,无缝访问存储在数据库中的数据。

本文介绍如何在Debian系统上构建高可用性的MongoDB数据库。我们将探讨多种方法,确保数据安全和服务持续运行。关键策略:副本集(ReplicaSet):利用副本集实现数据冗余和自动故障转移。当主节点出现故障时,副本集会自动选举新的主节点,保证服务的持续可用性。数据备份与恢复:定期使用mongodump命令进行数据库备份,并制定有效的恢复策略,以应对数据丢失风险。监控与报警:部署监控工具(如Prometheus、Grafana)实时监控MongoDB的运行状态,并

本文介绍如何在Debian系统上配置MongoDB实现自动扩容,主要步骤包括MongoDB副本集的设置和磁盘空间监控。一、MongoDB安装首先,确保已在Debian系统上安装MongoDB。使用以下命令安装:sudoaptupdatesudoaptinstall-ymongodb-org二、配置MongoDB副本集MongoDB副本集确保高可用性和数据冗余,是实现自动扩容的基础。启动MongoDB服务:sudosystemctlstartmongodsudosys

连接到数据库,Node.js 提供了 MySQL、PostgreSQL、MongoDB 和 Redis 等多种数据库连接器包。连接步骤包括:1. 安装相应的连接器包;2. 创建连接池维护可重用连接;3. 建立与数据库的连接。注意:操作为异步,需处理错误,保证安全性,优化性能。

是的,Navicat 可以连接到 MongoDB 数据库。具体步骤包括:打开 Navicat 并创建新的连接。选择数据库类型为 MongoDB。输入 MongoDB 主机地址、端口和数据库名称。输入 MongoDB 用户名和密码(如果需要)。单击“连接”按钮。
