Heim > Java > javaLernprogramm > Wie SpringBoot die Canal-Methode integriert

Wie SpringBoot die Canal-Methode integriert

WBOY
Freigeben: 2023-05-13 09:22:05
nach vorne
1279 Leute haben es durchsucht

pom.xml Canal.Client-Abhängigkeit hinzufügen

(1.1.5 hat sich stark geändert, der Client hier verwendet 1.1.4)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>top.yueshushu</groupId>
    <artifactId>learn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>Canal</name>
    <description>学习 Canal</description>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 导入配置文件处理器,配置文件进行绑定就会有提示,需要重启 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!--导入自动热步署的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <!--引入MySql的驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--引入springboot与mybatis整合的依赖-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.4</version>
        </dependency>
        <!-- 引入pagehelper分页插件 -->
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper-spring-boot-starter</artifactId>
            <version>1.2.5</version>
        </dependency>
        <!--添加 druid-spring-boot-starter的依赖的依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.14</version>
        </dependency>
        <!--SpringBoot 的aop 模块-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <!--添加canal的依赖. 重要.  使用  1.1.4-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.4</version>
        </dependency>
    </dependencies>
    <build>
        <!--将该目录下的文件全部打包成类的路径-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
Nach dem Login kopieren

Geschäftsfunktionsverarbeitung

Einfaches Verbindungsprogramm

/**
     * 一个简单的canal 的连接测试程序
     */
    @Test
    public void connectionTest() {
        //1. 创建连接  填充对应的地址信息 ,要监控的实例和相应的用户名和密码
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(
                        "127.0.0.1", 11111
                ),
                "example",
                "canal",
                "canal"
        );
        //2. 进行连接
        canalConnector.connect();
        log.info(">>>连接成功:{}", canalConnector);
    }
Nach dem Login kopieren

17:26:32.179 [main ] INFO top.yueshushu.learn.CanalDemoTest - >>>Verbindung erfolgreich: com.alibaba.otter.canal.client.impl.SimpleCanalConnector@31ef45e3

Einzelne Erfassung von Daten

/**
     * 获取数据信息. 可以发现,未获取到数据 .  这个应该是实时的.
     */
    @Test
    public void getDataTest() {
        //1. 创建连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example",
                "canal",
                "canal"
        );
        // 进行连接
        canalConnector.connect();
        //3. 注册,看使用哪个数据库表
        canalConnector.subscribe("springboot.user");
        //4. 获取 1条数据
        Message message = canalConnector.get(1);
        log.info("获取的数据:id:{},数据:{}", message.getId(), message);
        if (message.getId() == -1) {
            log.info(">>>未获取到数据");
            return;
        }
        //5. 获取相应的数据集合
        List<CanalEntry.Entry> entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {
            log.info(">>>获取数据 {}", entry);
            //获取表名
            CanalEntry.Header header = entry.getHeader();
            log.info(">>>获取表名:{}", header.getTableName());
            CanalEntry.EntryType entryType = entry.getEntryType();
            log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name());
            //获取数据
            ByteString storeValue = entry.getStoreValue();
            log.info(">>>输出存储的值:{}", storeValue);
        }
    }
Nach dem Login kopieren

Wie SpringBoot die Canal-Methode integriert

Im Wesentlichen Bibliothek Ein Datenelement einfügen

insert into springboot.user(id,name,age,sex,description) values(1,&#39;canal添加用户&#39;,24,&#39;男&#39;,&#39;学习canal&#39;);
Nach dem Login kopieren

Erneut ausführen:

Wie SpringBoot die Canal-Methode integriert

Schleife zum Abrufen von Daten

/**
     * 获取数据信息. 获取现在的数据.  再次执行时,就没有这个数据了.
     */
    @Test
    public void getNowDataTest() {
        //1. 创建连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example",
                "canal",
                "canal"
        );
        // 进行连接
        canalConnector.connect();
        //3. 注册,看使用哪个数据库表
        canalConnector.subscribe("springboot.user");
        for (;;) {
            //4. 获取 1条数据
            Message message = canalConnector.get(1);
            log.info("获取的数据:id:{},数据:{}", message.getId(), message);
            if (message.getId() == -1) {
                log.info(">>>未获取到数据");
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            //5. 获取相应的数据集合
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                log.info(">>>获取数据 {}", entry);
                //获取表名
                CanalEntry.Header header = entry.getHeader();
                log.info(">>>获取表名:{}", header.getTableName());
                CanalEntry.EntryType entryType = entry.getEntryType();
                log.info(">>获取类型 {}:,对应的信息:{}", entryType.getNumber(), entryType.name());
                //获取数据
                ByteString storeValue = entry.getStoreValue();
                log.info(">>>输出存储的值:{}", storeValue);
            }
        }
    }
Nach dem Login kopieren

Sie können jederzeit die entsprechenden Datenänderungsinformationen abrufen.

Sie werden feststellen, dass der Wert von storeValue schwer zu interpretieren ist. Diese Daten müssen analysiert werden.

Parsen Sie den StoreValue-Wert.

/**
     * 将 storeValue 进行解析,解析成我们能看懂的语句.
     * 对数据库 cud 进行处理操作观看一下.
     * 发现,点是不好的,也有多余的记录信息.
     *
     * @throws Exception 异常
     */
    @Test
    public void convertDataTest() throws Exception {
        //1. 创建连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("127.0.0.1", 11111),
                "example",
                "canal", "canal"
        );
        //2. 进行连接
        canalConnector.connect();
        canalConnector.subscribe("springboot.user");
        for (;;) {
            //获取信息
            Message message = canalConnector.get(1);
            if (message.getId() == -1L) {
                // log.info("未获取到数据");
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            List<CanalEntry.Entry> entryList = message.getEntries();
            //对获取到的数据进行处理
            log.info(">>获取到{}条数据", entryList.size());
            for (CanalEntry.Entry entry : entryList) {
                CanalEntry.Header header = entry.getHeader();
                log.info(">>>获取表名:{}", header.getTableName());
                //获取类型.
                CanalEntry.EntryType entryType = entry.getEntryType();
                log.info(">>类型编号 {},类型名称:{}", entryType.getNumber(), entryType.name());
                //获取存入日志的值
                ByteString storeValue = entry.getStoreValue();
                //将这个值进行解析
                CanalEntry.RowChange rowChange = RowChange.parseFrom(storeValue);
                String sql = rowChange.getSql();
                log.info(">>>获取对应的sql:{}", sql);
                // 这个sql 可能是 批量的sql语句
                List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                for (CanalEntry.RowData rowData : rowDatasList) {
                    log.info(">>>获取信息:{}", rowData);
                    //对数据进行处理
                    List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                    List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                    beforeColumnsList.forEach(
                            n -> log.info("哪个列{},原先是{},是否被更新{}", n.getName(),
                                    n.getValue(), n.getUpdated())
                    );
                    afterColumnsList.forEach(
                            n -> log.info("哪个列{},后来是{},是否被更新{}", n.getName(), n.getValue(), n.getUpdated())
                    );
                }
            }
        }
    }
Nach dem Login kopieren

Führen Sie SQL erneut aus separat .

Starten Sie zuerst das Testprogramm: Wie SpringBoot die Canal-Methode integriert

Es werden keine Informationen gedruckt.

Die Haupttabelle führt die Add-Anweisung aus:

insert into springboot.user(id,name,age,sex,description) 
values(2,&#39;canal添加用户2&#39;,25,&#39;男&#39;,&#39;学习canal2&#39;);
Nach dem Login kopieren

Die Informationen werden gedruckt:

Wie SpringBoot die Canal-Methode integriert

Das ist sehr gut lesbar.

Die Haupttabelle führt den Änderungsvorgang aus.

/**
     * 类型转换数据
     *
     * @throws Exception 异常
     */
    @Test
    public void dataTypeTest() throws Exception {
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(
                        "127.0.0.1", 11111
                ),
                "example",
                "canal", "canal"
        );
        canalConnector.connect();
        canalConnector.subscribe("springboot.user");
        for(;;){
            Message message = canalConnector.get(1);
            if (message.getId() == -1) {
                TimeUnit.SECONDS.sleep(1);
                continue;
            }
            List<CanalEntry.Entry> entries = message.getEntries();
            for (CanalEntry.Entry entry : entries) {
                CanalEntry.EntryType entryType = entry.getEntryType();
                //只要 RowData 数据类型的
                if (!CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                    continue;
                }
                String tableName = entry.getHeader().getTableName();
                log.info(">>>对表 {} 进行操作", tableName);
                ByteString storeValue = entry.getStoreValue();
                RowChange rowChange = RowChange.parseFrom(storeValue);
                //行改变
                CanalEntry.EventType eventType = rowChange.getEventType();
                switch (eventType) {
                    case INSERT: {
                        insertHandler(rowChange);
                        break;
                    }
                    case UPDATE: {
                        updateHandler(rowChange);
                        break;
                    }
                    case DELETE: {
                        deleteHandler(rowChange);
                        break;
                    }
                    default: {
                        break;
                    }
                }
            }
        }
    }
    private void deleteHandler(RowChange rowChange) {
        log.info(">>>>执行删除的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            for (CanalEntry.Column column : beforeColumnsList) {
                log.info(">>>>>字段 {} 删除数据 {}", column.getName(), column.getValue());
            }
        }
    }
    private void updateHandler(RowChange rowChange) {
        log.info(">>>执行更新的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            Map<String, String> beforeValueMap = beforeColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            Map<String, String> afterValueMap = afterColumnsList.stream().collect(
                    Collectors.toMap(
                            CanalEntry.Column::getName,
                            CanalEntry.Column::getValue
                    )
            );
            beforeValueMap.forEach((column, beforeValue) -> {
                String afterValue = afterValueMap.get(column);
                Boolean update = beforeValue.equals(afterValue);
                log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforeValue, afterValue,
                        update);
            });
        }
    }
    /**
     * 插入数据. 只有后的数据.
     *
     * @param rowChange 行改变
     */
    private void insertHandler(RowChange rowChange) {
        log.info(">>>执行添加 的方法");
        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
        for (CanalEntry.RowData rowData : rowDatasList) {
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            for (CanalEntry.Column column : afterColumnsList) {
                if (!StringUtils.hasText(column.getValue())) {
                    continue;
                }
                log.info("字段 {} 插入了数据 {}", column.getName(), column.getValue());
            }

        }
    }
Nach dem Login kopieren

Beim Aktualisieren, wenn jedes Feld aktualisiert wird mit Wie bisher wird kein Protokollverbrauch generiert.

Wie SpringBoot die Canal-Methode integriert

Die Haupttabelle führt den Löschvorgang aus:

insert into springboot.user(id,name,age,sex,description) 
values(4,&#39;canal添加用户4&#39;,25,&#39;男&#39;,&#39;学习canal4&#39;);
Nach dem Login kopieren

Die oben genannten Erfassungen werden jeweils einzeln erfasst. Die Effizienz ist relativ gering.

Wie SpringBoot die Canal-Methode integriertMehrere Daten gleichzeitig abrufen. SEKUNDEN); bedeutet 5. Erhalten Sie 3 Datensätze innerhalb von Sekunden.

Es gibt zwei Auslösebedingungen: eine besteht darin, 3 Datensätze zu erhalten, und die andere darin, 5 Sekunden zu erreichen.

Die Informationen zur Effektanzeige sind dieselben wie zuvor und werden daher nicht erneut demonstriert. Informationen zur Bestätigungskonfiguration canalConnector.ack(message.getId());Wie SpringBoot die Canal-Methode integriert

Bestätigen Sie manuell, dass die Nachricht verbraucht wurde.

Wenn die Nachricht rollback() zurückgesetzt wird, wird die Nachricht erneut konsumiert.

canalConnector.rollback();

Ausführungsanweisung:

update springboot.user set name=&#39;开开心心&#39;,age=26,description=&#39;岳泽霖&#39; where id =4;
Nach dem Login kopieren

Wenn es sich um eine manuelle Bestätigung handelt, wird

canalConnector.ack(message.getId());

nur einmal verbraucht.

Das obige ist der detaillierte Inhalt vonWie SpringBoot die Canal-Methode integriert. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:yisu.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage