Author: Wang Dong
Architect of Yixin Technology R&D Center
Theme introduction:
Hello everyone, I am Wang Dong, from CreditEase Technology R&D Center. This is my first time sharing in the community. If there are any shortcomings, please correct me and forgive me.
The theme of this sharing is "Implementation and Application of Log-Based DWS Platform", mainly to share some of the things we are currently doing at CreditEase. This topic contains the results of the efforts of many brothers and sisters of the two teams (the results of our team and the Shanwei team). This time I will write it on my behalf and try my best to introduce it to you.
In fact, the entire implementation is relatively simple in principle, and of course involves a lot of technology. I will try to express it in the simplest way possible to let everyone understand the principle and significance of this matter. During the process, if you have any questions, you can ask them at any time and I will try my best to answer them.
DWS is an abbreviation and consists of 3 sub-projects, which I will explain later.
1. BackgroundThe thing starts with the company’s needs some time ago. Everyone knows that CreditEase is an Internet financial company. Many of our data are different from standard Internet companies. Generally speaking, they are:
Everyone who plays with data knows that data is very valuable, and these data are stored in the databases of various systems. How can users who need data get consistent and real-time data?
There are several common practices in the past:None of these plans are perfect. After understanding and considering different implementation methods, we finally drew on the ideas of linkedin and believed that in order to solve data consistency and real-time performance at the same time, a more reasonable method should come from log.
(This picture comes from: https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad- idea/)
Use incremental Log as the basis of all systems. Subsequent data users consume logs by subscribing to kafka.
for example:
Why use log and kafka as the basis instead of using Sqoop for extraction? because:
Why not use dual write? , please refer to https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/
I won’t explain much here.
So we came up with the idea of building a log-based company-level platform.
Let’s explain the DWS platform. The DWS platform consists of three sub-projects:
In the picture:
Due to time constraints, today I will mainly introduce Dbus and Wormhole in DWS, and also introduce Swifts when needed.
As mentioned before, Dbus mainly solves the problem of extracting logs from the source in real time. Here we take MySQL as an example to briefly explain how to implement it.
We know that although MySQL InnoDB has its own log, MySQL primary and secondary synchronization is achieved through binlog. As shown below:
Pictures from: https://github.com/alibaba/canal
The binlog has three modes:
Their respective advantages and disadvantages are as follows:
Here comes from: http://www.jquerycn.cn/a_13625
Due to the shortcomings of the statement mode, we learned during the communication with our DBA that the row mode is used for replication in the actual production process. This makes it possible to read the entire log.
Usually our MySQL layout uses a solution of 2 master databases (vip), 1 slave database and 1 backup disaster recovery database. Since the disaster recovery database is usually used for remote disaster recovery, the real-time performance is not high. Not easy to deploy.
In order to minimize the impact on the source end, obviously we should read the binlog log from the slave library.
There are many solutions for reading binlog, and there are many on github. Please refer to https://github.com/search?utf8=✓&q=binlog. In the end, we chose Alibaba's canal as the log extraction method.
Canal was first used to synchronize Alibaba’s Chinese and American computer rooms. The principle of canal is relatively simple:
Pictures from: https://github.com/alibaba/canal
solutionThe main solutions for the MySQL version of Dbus are as follows:
For the incremental log, by subscribing to Canal Server, we get the incremental log of MySQL:
When considering using Storm as a solution, we mainly believe that Storm has the following advantages:
For flow tables, the incremental part is enough, but many tables need to know the initial (existing) information. At this time we need initial load (first load).
For the initial load (first load), a full extraction Storm program was also developed to pull from the standby database of the source database through jdbc connection. The initial load is to pull all the data, so we recommend doing it during low business peak periods. Fortunately, you only do it once and you don’t need to do it every day.
Full extraction, we draw on the ideas of Sqoop. The full extraction of Storm is divided into 2 parts:
Data sharding needs to consider the sharding column, split the data according to the range according to the configuration and automatically select the column, and save the sharding information to kafka.
The following is the specific sharding strategy:
The Storm program for full extraction reads the sharding information of Kafka and uses multiple concurrency levels to connect to the standby database in parallel for pulling. Because the extraction time may be very long. During the extraction process, the real-time status is written to Zookeeper to facilitate heartbeat program monitoring.
Unified Messaging FormatWhether it is incremental or full, the final message output to kafka is a unified message format we agreed on, called UMS (unified message schema) format.
As shown below:
The schema part of the message defines that the namespace is composed of type, data source name, schema name, table name, version number, sub-library number, sub-table number, which can describe all tables in the entire company and can be uniquely located through a namespace.
Payload refers to specific data. A json package can contain one or more pieces of data to increase the payload of the data.
The data types supported in UMS refer to Hive types and are simplified, basically including all data types.
Full and incremental consistencyIn the entire data transmission, in order to ensure the order of log messages as much as possible, kafka uses 1 partition. In general, it is basically sequential and unique.
But we know that writing Kafka will fail and may be rewritten. Storm also uses a redo mechanism. Therefore, we do not strictly guarantee exactly once and complete sequence, but we guarantee at least once.
So _ums_id_ becomes particularly important.
For full extraction, _ums_id_ is unique. Different id slices are taken from each degree of concurrency in zk, ensuring uniqueness and performance. Filling in negative numbers will not conflict with incremental data, and also ensure that they are early for incremental messages.
For incremental extraction, we use the MySQL log file number and log offset as the unique id. Id is used as a 64-bit long integer, the upper 7 bits are used for the log file number, and the lower 12 bits are used as the log offset.
For example: 000103000012345678. 103 is the log file number and 12345678 is the log offset.
In this way, the physical uniqueness is ensured from the log level (the ID number will not change even if it is redone), and the order is also guaranteed (the log can also be located). By comparing _ums_id_ consumption log, you can know which message is updated by comparing _ums_id_.
In fact, _ums_ts_ and _ums_id_ have similar intentions, except that sometimes _ums_ts_ may be repeated, that is, multiple operations occur in 1 millisecond, so you have to compare _ums_id_.
Heartbeat monitoring and early warningThe entire system involves various aspects such as the main and backup synchronization of the database, Canal Server, multiple concurrency Storm processes, etc.
Therefore, monitoring and early warning of the process is particularly important.
Through the heartbeat module, for example, insert a piece of mentality data into each extracted table every minute (configurable) and save the sending time. This heartbeat table is also extracted, and following the entire process, it is actually the same as the synchronized table. Following the same logic (because multiple concurrent Storms may have different branches), when a heartbeat packet is received, it can be proven that the entire link is open even if there is no added, deleted or modified data.
The Storm program and the heartbeat program send the data to the public statistical topic, and then the statistical program saves it to influxdb. Use grafana to display it, and you can see the following effect:
The picture shows the real-time monitoring information of a certain business system. The above is the real-time traffic situation, and the following is the real-time delay situation. It can be seen that the real-time performance is still very good. Basically, the data has been transferred to the terminal Kafka in 1 to 2 seconds.
Granfana provides a real-time monitoring capability.
If there is a delay, an email alarm or SMS alarm will be sent through the heartbeat module of dbus.
Real-time desensitizationConsidering data security, Dbus’s full storm and incremental storm programs also complete real-time desensitization functions for scenarios where desensitization is required. There are 3 methods of desensitization:
To summarize: Simply put, Dbus exports data from various sources in real time and provides subscriptions in the form of UMS, supporting real-time desensitization, actual monitoring and alarming.
After talking about Dbus, it’s time to talk about Wormhole. Why are the two projects not one but connected through kafka?
One of the big reasons is decoupling. Kafka has natural decoupling capabilities, and the program can directly do asynchronous message passing through kafka. Dbus and Wornhole also use kafka internally for message passing and decoupling.
Another reason is that UMS is self-describing. By subscribing to kafka, any capable user can directly consume UMS for use.
Although the results of UMS can be subscribed directly, development work is still required. What Wormhole solves is to provide one-click configuration to implement data in Kafka into various systems, allowing data users without development capabilities to use data through Wormhole.
As shown in the figure, Wormhole can implement UMS in kafka to various systems. Currently, the most commonly used ones are HDFS, JDBC database and HBase.
On the technology stack, wormhole chooses to use spark streaming.
In Wormhole, a flow refers from a namaspace from the source to the target. One spark streaming serves multiple flows.
There are good reasons to choose Spark:
Here to add the role of Swifts:
The comparison between Wormhole and Swifts is as follows:
dropHDFSConsume Kafka's UMS through the Wormhole Wpark Streaming program. First, the UMS log can be saved to HDFS.
Kafka generally only saves a few days of information and does not save all information, while HDFS can save all historical additions, deletions and modifications. This makes many things possible:
It can be said that the log in HDFS is the basis for many things.
Because Spark natively supports parquet very well, Spark SQL can provide good queries for Parquet. When UMS is implemented on HDFS, it is saved into a Parquet file. The content of Parquet is that all log addition, deletion and modification information as well as _ums_id_ and _ums_ts_ are saved.
Wormhole spark streaming distributes and stores data into different directories according to namespace, that is, different tables and versions are placed in different directories.
Since the Parquet files written each time are small files, everyone knows that HDFS does not perform well for small files, so there is another job to merge these Parquet files into large files regularly every day.
Each Parquet file directory has the start time and end time of the file data. In this way, when refilling data, you can decide which Parquet files need to be read based on the selected time range, without reading all the data.
Impotence of inserting or updating dataOften we encounter the need to process data and put it into a database or HBase. So a question involved here is, what kind of data can be updated?
The most important principle here is the idempotence of data.
Regardless of whether we encounter additions, deletions or modifications of any data, the problems we face are:
For the first question, you actually need to find a unique key to locate the data. Common ones include:
For the second question, it involves _ums_id_, because we have ensured that the large value of _ums_id_ is updated, so after finding the corresponding data row, we will replace and update it according to this principle.
The reason why we need to soft delete and add the _is_active_ column is for such a situation:
If the inserted _ums_id_ is relatively large, it is deleted data (indicating that the data has been deleted). If it is not a soft deletion, inserting a small _ums_id_ data (old data) at this time will really Insert it.
This causes old data to be inserted. No longer idempotent. Therefore, it is valuable that the deleted data is still retained (soft deletion), and it can be used to ensure the idempotence of the data.
HBase SavingInserting data into Hbase is quite simple. The difference is that HBase can retain multiple versions of data (of course, you can also retain only one version). The default is to retain 3 versions;
So when inserting data into HBase, the problem that needs to be solved is:
The choice of Version is very interesting. It takes advantage of the uniqueness and auto-increment of _ums_id_, which is consistent with the comparison relationship of version itself: that is, a larger version is equivalent to a larger _ums_id_, and the corresponding version is newer.
From the perspective of improving performance, we can directly insert the entire Spark Streaming Dataset collection into HBase without comparison. Let HBase automatically determine for us which data can be retained and which data does not need to be retained based on version.
Jdbc insert data:
Insert data into the database. Although the principle of ensuring idempotence is simple, if you want to improve performance, the implementation becomes much more complicated. You cannot compare one by one and then insert or update.
We know that Spark's RDD/dataset is operated in a collection manner to improve performance. Similarly, we need to achieve idempotence in a collection operation manner.
The specific idea is:
A: Data that does not exist, that is, this part of the data can be inserted;
B: Compare the existing data with _ums_id_, and finally only update the larger rows of _ums_id_ to the target database, and discard the smaller rows directly.
Students who use Spark know that RDD/dataset can be partitioned, and multiple workers can be used and operated to improve efficiency.
When considering concurrency, both insertion and update may fail, so there are also strategies to consider after failure.
For example: Because other workers have already inserted, and the unique constraint insertion fails, you need to update it instead, and compare _ums_id_ to see if it can be updated.
Wormhole also has a retry mechanism for other situations where it cannot be inserted (such as a problem with the target system). There are so many details. Not much introduction here.
Some are still under development.
I won’t go into details about inserting into other storages. The general principle is: design a collection-based, concurrent data insertion implementation based on the characteristics of each storage. These are Wormhole's efforts for performance, and users who use Wormhole don't have to worry about them.
Having said so much, what are the practical applications of DWS? Next, I will introduce the real-time marketing implemented by a certain system using DWS.
As shown in FIG:
The data of System A is saved in its own database. We know that CreditEase provides many financial services, including borrowing, and a very important thing in the borrowing process is credit review.
Borrowers need to provide information that proves their credit worthiness, such as the central bank credit report, which is the data with the strongest credit data. Bank transactions and online shopping transactions are also data with strong credit attributes.
When a borrower fills in credit information in System A through the Web or mobile APP, he may not be able to continue for some reason. Although this borrower may be a high-quality potential customer, this information was previously unavailable or took a long time to be known, so In fact, such customers are lost.
After applying DWS, the information filled in by the borrower has been recorded in the database, and is extracted, calculated and implemented in the target database in real time through DWS. Evaluate high-quality customers based on customer ratings. Then immediately output the customer's information to the customer service system.
The customer service staff contacted the borrower (potential customer) by calling in a very short time (within a few minutes), provided customer care, and converted the potential customer into a real customer. We know that borrowing is time-sensitive and will be of no value if it takes too long.
Without the ability to extract/compute/drop in real-time, none of this would be possible.
Real-time reporting systemAnother real-time report application is as follows:
The data of our data users comes from multiple systems. In the past, we obtained the report information through T 1 and then guided the operations the next day, which was very timely.
Through DWS, data can be extracted, calculated and implemented in real time from multiple systems, and reports can be displayed, allowing operations to make timely deployment and adjustments and respond quickly.
Having said so much, let me summarize it roughly:
Suitable scenarios include: Real-time synchronization/real-time calculation/real-time monitoring/real-time reporting/real-time analysis/real-time insight/real-time management/real-time operation/real-time decision-making
Thank you everyone for listening, this sharing ends here.
Q1: Is there an open source solution for Oracle log reader?
A1:There are also many commercial solutions for the Oracle industry, such as: Oracle GoldenGate (original goldengate), Oracle Xstream, IBM InfoSphere Change Data Capture (original DataMirror), Dell SharePlex (original Quest ), domestic DSG superSync, etc., there are very few open source solutions that are easy to use.
Q2: How much manpower and material resources have been invested in this project? It feels a bit complicated.
Q2: DWS is composed of three sub-projects, with an average of 5 to 7 people per project. It's a bit complicated, but it's actually an attempt to use big data technology to solve the difficulties our company is currently encountering.
Because we are engaged in big data related technologies, all the brothers and sisters in the team are quite happy:)
In fact, Dbus and Wormhole are relatively fixed and easy to reuse. Swifts real-time computing is related to each business, has strong customization, and is relatively troublesome.
Q3: Will Yixin’s DWS system be open source?
A3: We have also considered contributing to the community. Just like other open source projects of CreditEase, the project has just taken shape and needs to be further refined. I believe that at some point in the future, we will contribute to the community. It's open source.
Q4: How do you understand an architect? Is he a system engineer?
A4: is not a system engineer. We have multiple architects at CreditEase. They should be considered technical managers who drive business with technology. Including product design, technical management, etc.
Q5:Is the replication scheme OGG?
A5: OGG and other commercial solutions mentioned above are options.
Source of the article: DBAplus community (dbaplus)
The above is the detailed content of Real-time extraction and log-based data synchronization consistency. For more information, please follow other related articles on the PHP Chinese website!