Home > System Tutorial > LINUX > Real-time extraction and log-based data synchronization consistency

Real-time extraction and log-based data synchronization consistency

王林
Release: 2024-01-16 14:36:05
forward
720 people have browsed it

Author: Wang Dong

Architect of Yixin Technology R&D Center

  • Currently working at CreditEase Technology R&D Center as an architect, responsible for stream computing and big data business product solutions.
  • Formerly worked as a senior engineer at the China R&D Center of Naver China (the largest search engine company in South Korea). He has been engaged in CUBRID distributed database cluster development and CUBRID database engine development for many years http://www.cubrid.org/blog/news/cubrid- cluster-introduction/

Theme introduction:

  1. Background introduction to DWS
  2. dbus wormhole overall architecture and technical implementation plan
  3. Practical application cases of DWS
Preface

Hello everyone, I am Wang Dong, from CreditEase Technology R&D Center. This is my first time sharing in the community. If there are any shortcomings, please correct me and forgive me.

The theme of this sharing is "Implementation and Application of Log-Based DWS Platform", mainly to share some of the things we are currently doing at CreditEase. This topic contains the results of the efforts of many brothers and sisters of the two teams (the results of our team and the Shanwei team). This time I will write it on my behalf and try my best to introduce it to you.

In fact, the entire implementation is relatively simple in principle, and of course involves a lot of technology. I will try to express it in the simplest way possible to let everyone understand the principle and significance of this matter. During the process, if you have any questions, you can ask them at any time and I will try my best to answer them.

DWS is an abbreviation and consists of 3 sub-projects, which I will explain later.

1. Background

The thing starts with the company’s needs some time ago. Everyone knows that CreditEase is an Internet financial company. Many of our data are different from standard Internet companies. Generally speaking, they are:

Real-time extraction and log-based data synchronization consistency

Everyone who plays with data knows that data is very valuable, and these data are stored in the databases of various systems. How can users who need data get consistent and real-time data?

There are several common practices in the past:
  1. DBA opens the backup database of each system. During low business peak periods (such as at night), users can extract the required data. Due to different extraction times, data inconsistencies among various data users, data conflicts, and repeated extractions, I believe many DBAs have a headache.
  2. The company's unified big data platform uses Sqoop to uniformly extract data from various systems during low business periods, save it into Hive tables, and then provide data services to other data users. This approach solves the consistency problem, but the timeliness is poor, basically the timeliness of T 1.
  3. The main problem of obtaining incremental changes based on triggers is that the business side is highly intrusive, and triggers also cause performance losses.

None of these plans are perfect. After understanding and considering different implementation methods, we finally drew on the ideas of linkedin and believed that in order to solve data consistency and real-time performance at the same time, a more reasonable method should come from log.

Real-time extraction and log-based data synchronization consistency

(This picture comes from: https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad- idea/)

Use incremental Log as the basis of all systems. Subsequent data users consume logs by subscribing to kafka.

for example:

  • Users of big data can save data to Hive tables or Parquet files for Hive or Spark query;
  • Users who provide search services can be saved in Elasticsearch or HBase;
  • Users who provide caching services can cache logs in Redis or alluxio;
  • Users of data synchronization can save data to their own database;
  • Since kafka's logs can be consumed repeatedly and cached for a period of time, each user can maintain consistency with the database and ensure real-time performance by consuming kafka's logs;

Why use log and kafka as the basis instead of using Sqoop for extraction? because:

Real-time extraction and log-based data synchronization consistency

Why not use dual write? , please refer to https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/

I won’t explain much here.

2. Overall architecture

So we came up with the idea of ​​building a log-based company-level platform.

Let’s explain the DWS platform. The DWS platform consists of three sub-projects:

  1. Dbus (data bus): Responsible for extracting data from the source in real time, converting it into the agreed json format data (UMS data) with its own schema, and putting it into kafka;
  2. Wormhole (data exchange platform): Responsible for reading data from kafka and writing data to the target;
  3. Swifts (real-time computing platform): is responsible for reading data from kafka, calculating it in real time, and writing the data back to kafka.

Real-time extraction and log-based data synchronization consistency

In the picture:

  • Log extractor and dbus work together to complete data extraction and data conversion. The extraction includes full and incremental extraction.
  • Wormhole can save all log data to HDFS; it can also implement data to all databases that support jdbc, including HBash, Elasticsearch, Cassandra, etc.;
  • Swifts supports streaming calculations through configuration and SQL, including supporting streaming join, look up, filter, window aggregation and other functions;
  • Dbus web is the configuration management end of dbus. In addition to configuration management, rider also includes runtime management of Wormhole and Swifts, data quality verification, etc.

Due to time constraints, today I will mainly introduce Dbus and Wormhole in DWS, and also introduce Swifts when needed.

3. dbus solution
Log analysis

As mentioned before, Dbus mainly solves the problem of extracting logs from the source in real time. Here we take MySQL as an example to briefly explain how to implement it.

We know that although MySQL InnoDB has its own log, MySQL primary and secondary synchronization is achieved through binlog. As shown below:

Real-time extraction and log-based data synchronization consistency

Pictures from: https://github.com/alibaba/canal

The binlog has three modes:

  1. Row mode: The modified form of each row of data will be recorded in the log, and then the same data will be modified on the slave side.
  2. Statement mode: Every sql that modifies data will be recorded in the master's bin-log. When the slave replicates, the SQL process will parse it into the same SQL that was executed on the original master side and execute it again.
  3. Mixed mode: MySQL will distinguish the log form to be recorded based on each specific SQL statement executed, that is, choose one between Statement and Row.

Their respective advantages and disadvantages are as follows:

Real-time extraction and log-based data synchronization consistency

Here comes from: http://www.jquerycn.cn/a_13625

Due to the shortcomings of the statement mode, we learned during the communication with our DBA that the row mode is used for replication in the actual production process. This makes it possible to read the entire log.

Usually our MySQL layout uses a solution of 2 master databases (vip), 1 slave database and 1 backup disaster recovery database. Since the disaster recovery database is usually used for remote disaster recovery, the real-time performance is not high. Not easy to deploy.

In order to minimize the impact on the source end, obviously we should read the binlog log from the slave library.

There are many solutions for reading binlog, and there are many on github. Please refer to https://github.com/search?utf8=✓&q=binlog. In the end, we chose Alibaba's canal as the log extraction method.

Canal was first used to synchronize Alibaba’s Chinese and American computer rooms. The principle of canal is relatively simple:

  1. Canal simulates the interaction protocol of MySQL Slave, disguises itself as MySQL Slave, and sends the dump protocol to MySQL Slave
  2. MySQL master receives the dump request and starts pushing binary log to Slave (that is, canal)
  3. Canal parses the binary log object (originally a byte stream)

Real-time extraction and log-based data synchronization consistency

Pictures from: https://github.com/alibaba/canal

solution

The main solutions for the MySQL version of Dbus are as follows:

Real-time extraction and log-based data synchronization consistency

For the incremental log, by subscribing to Canal Server, we get the incremental log of MySQL:

  • According to the output of Canal, the log is in protobuf format. Develop an incremental Storm program to convert the data into the UMS format we defined in real time (json format, I will introduce it later), and save it to kafka;
  • The incremental Storm program is also responsible for capturing schema changes to control the version number;
  • Incremental Storm configuration information is stored in Zookeeper to meet high availability requirements.
  • Kafka serves both as output results and as a buffer and message deconstruction area during processing.

When considering using Storm as a solution, we mainly believe that Storm has the following advantages:

  • The technology is relatively mature and stable, and it can be considered a standard combination when paired with kafka;
  • The real-time performance is relatively high and can meet the real-time requirements;
  • Meet high availability requirements;
  • By configuring Storm concurrency, you can activate the ability to expand performance;
Extract the full amount

For flow tables, the incremental part is enough, but many tables need to know the initial (existing) information. At this time we need initial load (first load).

For the initial load (first load), a full extraction Storm program was also developed to pull from the standby database of the source database through jdbc connection. The initial load is to pull all the data, so we recommend doing it during low business peak periods. Fortunately, you only do it once and you don’t need to do it every day.

Full extraction, we draw on the ideas of Sqoop. The full extraction of Storm is divided into 2 parts:

  1. Data Sharding
  2. Actual extraction

Data sharding needs to consider the sharding column, split the data according to the range according to the configuration and automatically select the column, and save the sharding information to kafka.

Real-time extraction and log-based data synchronization consistency

The following is the specific sharding strategy:

Real-time extraction and log-based data synchronization consistency

The Storm program for full extraction reads the sharding information of Kafka and uses multiple concurrency levels to connect to the standby database in parallel for pulling. Because the extraction time may be very long. During the extraction process, the real-time status is written to Zookeeper to facilitate heartbeat program monitoring.

Real-time extraction and log-based data synchronization consistency

Unified Messaging Format

Whether it is incremental or full, the final message output to kafka is a unified message format we agreed on, called UMS (unified message schema) format.

As shown below:

Real-time extraction and log-based data synchronization consistencyReal-time extraction and log-based data synchronization consistency

The schema part of the message defines that the namespace is composed of type, data source name, schema name, table name, version number, sub-library number, sub-table number, which can describe all tables in the entire company and can be uniquely located through a namespace.

  • _ums_op_ Indicates that the data type is I (insert), U (update), D (delete);
  • _ums_ts_ The timestamp of events where additions, deletions and modifications occur, obviously the timestamp of new data is updated;
  • _ums_id_ The unique id of the message, ensuring that the message is unique, but here we ensure the order of the messages (explained later);

Payload refers to specific data. A json package can contain one or more pieces of data to increase the payload of the data.

The data types supported in UMS refer to Hive types and are simplified, basically including all data types.

Full and incremental consistency

In the entire data transmission, in order to ensure the order of log messages as much as possible, kafka uses 1 partition. In general, it is basically sequential and unique.

But we know that writing Kafka will fail and may be rewritten. Storm also uses a redo mechanism. Therefore, we do not strictly guarantee exactly once and complete sequence, but we guarantee at least once.

So _ums_id_ becomes particularly important.

For full extraction, _ums_id_ is unique. Different id slices are taken from each degree of concurrency in zk, ensuring uniqueness and performance. Filling in negative numbers will not conflict with incremental data, and also ensure that they are early for incremental messages.

For incremental extraction, we use the MySQL log file number and log offset as the unique id. Id is used as a 64-bit long integer, the upper 7 bits are used for the log file number, and the lower 12 bits are used as the log offset.

For example: 000103000012345678. 103 is the log file number and 12345678 is the log offset.

In this way, the physical uniqueness is ensured from the log level (the ID number will not change even if it is redone), and the order is also guaranteed (the log can also be located). By comparing _ums_id_ consumption log, you can know which message is updated by comparing _ums_id_.

In fact, _ums_ts_ and _ums_id_ have similar intentions, except that sometimes _ums_ts_ may be repeated, that is, multiple operations occur in 1 millisecond, so you have to compare _ums_id_.

Heartbeat monitoring and early warning

The entire system involves various aspects such as the main and backup synchronization of the database, Canal Server, multiple concurrency Storm processes, etc.

Therefore, monitoring and early warning of the process is particularly important.

Through the heartbeat module, for example, insert a piece of mentality data into each extracted table every minute (configurable) and save the sending time. This heartbeat table is also extracted, and following the entire process, it is actually the same as the synchronized table. Following the same logic (because multiple concurrent Storms may have different branches), when a heartbeat packet is received, it can be proven that the entire link is open even if there is no added, deleted or modified data.

The Storm program and the heartbeat program send the data to the public statistical topic, and then the statistical program saves it to influxdb. Use grafana to display it, and you can see the following effect:

Real-time extraction and log-based data synchronization consistency

The picture shows the real-time monitoring information of a certain business system. The above is the real-time traffic situation, and the following is the real-time delay situation. It can be seen that the real-time performance is still very good. Basically, the data has been transferred to the terminal Kafka in 1 to 2 seconds.

Granfana provides a real-time monitoring capability.

If there is a delay, an email alarm or SMS alarm will be sent through the heartbeat module of dbus.

Real-time desensitization

Considering data security, Dbus’s full storm and incremental storm programs also complete real-time desensitization functions for scenarios where desensitization is required. There are 3 methods of desensitization:

Real-time extraction and log-based data synchronization consistency

To summarize: Simply put, Dbus exports data from various sources in real time and provides subscriptions in the form of UMS, supporting real-time desensitization, actual monitoring and alarming.

4. Wormhole solution

After talking about Dbus, it’s time to talk about Wormhole. Why are the two projects not one but connected through kafka?

One of the big reasons is decoupling. Kafka has natural decoupling capabilities, and the program can directly do asynchronous message passing through kafka. Dbus and Wornhole also use kafka internally for message passing and decoupling.

Another reason is that UMS is self-describing. By subscribing to kafka, any capable user can directly consume UMS for use.

Although the results of UMS can be subscribed directly, development work is still required. What Wormhole solves is to provide one-click configuration to implement data in Kafka into various systems, allowing data users without development capabilities to use data through Wormhole.

Real-time extraction and log-based data synchronization consistency

As shown in the figure, Wormhole can implement UMS in kafka to various systems. Currently, the most commonly used ones are HDFS, JDBC database and HBase.

On the technology stack, wormhole chooses to use spark streaming.

In Wormhole, a flow refers from a namaspace from the source to the target. One spark streaming serves multiple flows.

Real-time extraction and log-based data synchronization consistency

There are good reasons to choose Spark:

  • Spark naturally supports various heterogeneous storage systems;
  • Although Spark Stream has slightly worse latency than Storm, Spark has better throughput and better computing performance;
  • Spark has greater flexibility in supporting parallel computing;
  • Spark provides a unified function to solve Sparking Job, Spark Streaming, and Spark SQL within a technology stack to facilitate later development;

Here to add the role of Swifts:

  • The essence of Swifts is to read UMS data in kafka, perform real-time calculations, and write the results to another topic in kafka.
  • Real-time calculation can be done in many ways: such as filter, projection (projection), lookup, streaming join window aggregation, which can complete various streaming real-time calculations with business value.

The comparison between Wormhole and Swifts is as follows:

Real-time extraction and log-based data synchronization consistency

dropHDFS

Consume Kafka's UMS through the Wormhole Wpark Streaming program. First, the UMS log can be saved to HDFS.

Kafka generally only saves a few days of information and does not save all information, while HDFS can save all historical additions, deletions and modifications. This makes many things possible:

  • By replaying the logs in HDFS, we can restore historical snapshots at any time.
  • You can make a zipper table to restore the historical information of each record for easy analysis;
  • When an error occurs in the program, you can use backfill to re-consume the messages and re-form a new snapshot.

It can be said that the log in HDFS is the basis for many things.

Because Spark natively supports parquet very well, Spark SQL can provide good queries for Parquet. When UMS is implemented on HDFS, it is saved into a Parquet file. The content of Parquet is that all log addition, deletion and modification information as well as _ums_id_ and _ums_ts_ are saved.

Wormhole spark streaming distributes and stores data into different directories according to namespace, that is, different tables and versions are placed in different directories.

Real-time extraction and log-based data synchronization consistency

Since the Parquet files written each time are small files, everyone knows that HDFS does not perform well for small files, so there is another job to merge these Parquet files into large files regularly every day.

Each Parquet file directory has the start time and end time of the file data. In this way, when refilling data, you can decide which Parquet files need to be read based on the selected time range, without reading all the data.

Impotence of inserting or updating data

Often we encounter the need to process data and put it into a database or HBase. So a question involved here is, what kind of data can be updated?

The most important principle here is the idempotence of data.

Regardless of whether we encounter additions, deletions or modifications of any data, the problems we face are:

  1. Which row should be updated;
  2. What is the updated strategy.

For the first question, you actually need to find a unique key to locate the data. Common ones include:

  1. Use the primary key of the business library;
  2. The business party specifies several columns as joint unique indexes;

For the second question, it involves _ums_id_, because we have ensured that the large value of _ums_id_ is updated, so after finding the corresponding data row, we will replace and update it according to this principle.

Real-time extraction and log-based data synchronization consistency

The reason why we need to soft delete and add the _is_active_ column is for such a situation:

If the inserted _ums_id_ is relatively large, it is deleted data (indicating that the data has been deleted). If it is not a soft deletion, inserting a small _ums_id_ data (old data) at this time will really Insert it.

This causes old data to be inserted. No longer idempotent. Therefore, it is valuable that the deleted data is still retained (soft deletion), and it can be used to ensure the idempotence of the data.

HBase Saving

Inserting data into Hbase is quite simple. The difference is that HBase can retain multiple versions of data (of course, you can also retain only one version). The default is to retain 3 versions;

So when inserting data into HBase, the problem that needs to be solved is:

  1. Choose the appropriate rowkey: The design of Rowkey is optional. The user can select the primary key of the source table, or select several columns as joint primary keys.
  2. Choose the appropriate version: use _ums_id_ a larger offset (such as 10 billion) as the row version.

The choice of Version is very interesting. It takes advantage of the uniqueness and auto-increment of _ums_id_, which is consistent with the comparison relationship of version itself: that is, a larger version is equivalent to a larger _ums_id_, and the corresponding version is newer.

From the perspective of improving performance, we can directly insert the entire Spark Streaming Dataset collection into HBase without comparison. Let HBase automatically determine for us which data can be retained and which data does not need to be retained based on version.

Jdbc insert data:

Insert data into the database. Although the principle of ensuring idempotence is simple, if you want to improve performance, the implementation becomes much more complicated. You cannot compare one by one and then insert or update.

We know that Spark's RDD/dataset is operated in a collection manner to improve performance. Similarly, we need to achieve idempotence in a collection operation manner.

The specific idea is:

  1. First query the target database according to the primary key in the set to obtain an existing data set;
  2. Compared with the collections in the dataset, they are divided into two categories:

A: Data that does not exist, that is, this part of the data can be inserted;

B: Compare the existing data with _ums_id_, and finally only update the larger rows of _ums_id_ to the target database, and discard the smaller rows directly.

Students who use Spark know that RDD/dataset can be partitioned, and multiple workers can be used and operated to improve efficiency.

When considering concurrency, both insertion and update may fail, so there are also strategies to consider after failure.

For example: Because other workers have already inserted, and the unique constraint insertion fails, you need to update it instead, and compare _ums_id_ to see if it can be updated.

Wormhole also has a retry mechanism for other situations where it cannot be inserted (such as a problem with the target system). There are so many details. Not much introduction here.

Some are still under development.

I won’t go into details about inserting into other storages. The general principle is: design a collection-based, concurrent data insertion implementation based on the characteristics of each storage. These are Wormhole's efforts for performance, and users who use Wormhole don't have to worry about them.

5. Application cases
Real-time Marketing

Having said so much, what are the practical applications of DWS? Next, I will introduce the real-time marketing implemented by a certain system using DWS.

Real-time extraction and log-based data synchronization consistency

As shown in FIG:

The data of System A is saved in its own database. We know that CreditEase provides many financial services, including borrowing, and a very important thing in the borrowing process is credit review.

Borrowers need to provide information that proves their credit worthiness, such as the central bank credit report, which is the data with the strongest credit data. Bank transactions and online shopping transactions are also data with strong credit attributes.

When a borrower fills in credit information in System A through the Web or mobile APP, he may not be able to continue for some reason. Although this borrower may be a high-quality potential customer, this information was previously unavailable or took a long time to be known, so In fact, such customers are lost.

After applying DWS, the information filled in by the borrower has been recorded in the database, and is extracted, calculated and implemented in the target database in real time through DWS. Evaluate high-quality customers based on customer ratings. Then immediately output the customer's information to the customer service system.

The customer service staff contacted the borrower (potential customer) by calling in a very short time (within a few minutes), provided customer care, and converted the potential customer into a real customer. We know that borrowing is time-sensitive and will be of no value if it takes too long.

Without the ability to extract/compute/drop in real-time, none of this would be possible.

Real-time reporting system

Another real-time report application is as follows:

Real-time extraction and log-based data synchronization consistency

The data of our data users comes from multiple systems. In the past, we obtained the report information through T 1 and then guided the operations the next day, which was very timely.

Through DWS, data can be extracted, calculated and implemented in real time from multiple systems, and reports can be displayed, allowing operations to make timely deployment and adjustments and respond quickly.

6. Summary

Having said so much, let me summarize it roughly:

  • DWS technology is based on the mainstream real-time streaming big data technology framework, with high availability, large throughput, strong horizontal expansion, low latency, high fault tolerance and ultimate consistency.
  • DWS capabilities support heterogeneous multi-source and multi-target systems, multiple data formats (structured semi-structured unstructured data) and real-time technical capabilities.
  • DWS combines three sub-projects and launches them as one platform, which enables us to have real-time capabilities and drive various real-time scenario applications.

Suitable scenarios include: Real-time synchronization/real-time calculation/real-time monitoring/real-time reporting/real-time analysis/real-time insight/real-time management/real-time operation/real-time decision-making

Thank you everyone for listening, this sharing ends here.

Q&A

Q1: Is there an open source solution for Oracle log reader?

A1:There are also many commercial solutions for the Oracle industry, such as: Oracle GoldenGate (original goldengate), Oracle Xstream, IBM InfoSphere Change Data Capture (original DataMirror), Dell SharePlex (original Quest ), domestic DSG superSync, etc., there are very few open source solutions that are easy to use.

Q2: How much manpower and material resources have been invested in this project? It feels a bit complicated.

Q2: DWS is composed of three sub-projects, with an average of 5 to 7 people per project. It's a bit complicated, but it's actually an attempt to use big data technology to solve the difficulties our company is currently encountering.

Because we are engaged in big data related technologies, all the brothers and sisters in the team are quite happy:)

In fact, Dbus and Wormhole are relatively fixed and easy to reuse. Swifts real-time computing is related to each business, has strong customization, and is relatively troublesome.

Q3: Will Yixin’s DWS system be open source?

A3: We have also considered contributing to the community. Just like other open source projects of CreditEase, the project has just taken shape and needs to be further refined. I believe that at some point in the future, we will contribute to the community. It's open source.

Q4: How do you understand an architect? Is he a system engineer?

A4: is not a system engineer. We have multiple architects at CreditEase. They should be considered technical managers who drive business with technology. Including product design, technical management, etc.

Q5:Is the replication scheme OGG?

A5: OGG and other commercial solutions mentioned above are options.

Source of the article: DBAplus community (dbaplus)

The above is the detailed content of Real-time extraction and log-based data synchronization consistency. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:linuxprobe.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template