Blogger Information
Blog 85
fans 0
comment 0
visits 72824
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
用MongoDB Change Streams 在BigQuery中复制数据
Smile
Original
441 people have browsed it

译者注:Chang Stream(变更记录流) 是指数据库集合的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更。BigQuery是Google推出的一项Web服务,该服务让开发者可以使用Google的架构来运行SQL语句对超级大的数据库进行操作。


当我们为BigQuery数据管道使用MongoDB变更流构建一个MongoDB时面临的挑战和学到的东西
作者 David Gasquez


在讲技术细节之前,我们最好回顾一下为什么要建立这个管道。主要有以下两个原因:


1. 在一定的规模上为了分析而查询MongoDB是低效的。
2. 我们没有把所有数据放在MongoDB中(例如分条计费信息)。
3. 在一定的规模上,作为服务供应商的数据管道价格昂贵。通常,不会提供类似软删除(例如,使用一个deleted_at字段)这样的复制删除记录的方法。


复制无模式数据


使用MongoDB数据库是我们要注意的第一件事情就是一些集合有一个诡异的模式。文档套文档,而且其中一些文档也是数组。


通常,一个嵌套文档代表一个一对一关系,一个数组是一对多关系。幸运的是Big Query同时支持重复的和嵌套的字段。


根据我们的研究,最常用的复制MongoDB数据的方法是在集合中使用一个时间戳字段。该字段的典型名称是updated_at,在每个记录插入和更新是该字段就会更新。使用批处理的方法是很容易实现这种方式的,只需要查询预期的数据库即可。当将这种方法运用到我们的数据和集合,我们发现两个主要的问题:


1. 并非所有我们想要复制的集合都有这个字段。没有updated_at字段,我们如何知道要复制那些更新的记录呢?


2. 这种方法不会跟踪已删除记录。我们只是把他们从原始集合中移除了,但永远不会在Big Query表中进行更新。


幸运的是,MongoDB把对集合产生的所有的变化都记录在oplog的(oplog是local库下的一个固定集合)日志里面。MongoDB 3.6版本以来,你可以使用变更流API来查询日志。这样,我们就会在集合中发生每个变化(包括删除操作)时得到警示。


那么我们的目的就是构建一个管道,该管道可以移动所有变更事件记录,这些记录来自一个Big Query表,MongoDB使用每个记录最近的状态把流变更为这张表。


构建管道


我们的第一个方法是在Big Query中为每个集合创建一个变更流,该集合是我们想要复制的,并从那个集合的所有变更流事件中获取方案。这种办法很巧妙。如果在一个记录中添加一个新的字段,买二手手机号管道应该足够智能,以便在插入记录时修改Big Query表。


由于想要尽可能的在Big Query中获取数据,我们用了另外一个方法。把所有的变更流事件以JSON块的形式放在BigQuery中。我们可以使用dbt这样的把原始的JSON数据工具解析、存储和转换到一个合适的SQL表中。这当然有一些缺点,但可以让我们拥有一个真正及时的端到端管道。


管道有以下部件:


1. 一个运行在Kubernetes(是一个开源的,用于管理云平台中多个主机上的容器化的应用)(carden,一款开发人员工具)的服务,他可以读取每个集合的MongoDB变更流,并将其放在一个简单的Big Query表当中(添加所有的记录)。


2. 一个读取带有增量原始数据的源表并实现在一个新表中查询的dbt(是一个命令行工具,只需编写select语句即可转换仓库中的数据)


cronjob(顾名思义,是一种能够在固定时间运行的Job对象)。这个表中包含了每一行自上一次运行以来的所有状态。这是一个dbt SQL在生产环境下如何操作的例子。


通过这两个步骤,我们实时拥有了从MongoDB到Big Query的数据流。我们也可以跟踪删除以及所有发生在我们正在复制的表上的变化(这对一些需要一段时间内的变化信息的分析是很有用的)。


由于在MongoDB变更流爬行服务日期之前我们没有任何数据,所以我们错失了很多记录。为了解决这一问题,我们决定通过创建伪变化事件回填数据。我们备份了MongoDB集合,并制作了一个简单的脚本以插入用于包裹的文档。这些记录送入到同样的BigQuery表中。现在,运行同样的dbt模型给了我们带有所有回填记录的最终表。


我们发现最主要的问题是需要用SQL写所有的提取操作。这意味着大量额外的SQL代码和一些额外的处理。当时使用dbt处理不难。另外一个小问题是BigQuery并不天生支持提取一个以JSON编码的数组中的所有元素。


结论


对于我们来说付出的代价(迭代时间,轻松的变化,简单的管道)是物超所值的。因为我们一开始使用这个管道就发现它对端到端以及快速迭代的所有工作都非常有用!我们用只具有BigQuery增加功能的变更流表作为分隔。未来我们计划迁移到Apache Beam(是一个统一的编程框架,支持批处理和流处理,并可以将用Beam编程模型构造出来的程序,在多个计算引擎如Apache Apex, Apache Flink, Apache Spark, Google Cloud Dataflow等上运行。)和云数据流上面,但那些工作要再写文字说明了。


希望你发现这些有趣的见解!你可以在Twitter上找到作者@davidgasquez.如果有任何疑问,别犹豫联系他吧。


Statement of this Website
The copyright of this blog article belongs to the blogger. Please specify the address when reprinting! If there is any infringement or violation of the law, please contact admin@php.cn Report processing!
All comments Speak rationally on civilized internet, please comply with News Comment Service Agreement
0 comments
Author's latest blog post