首頁 資料庫 mysql教程 基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

May 26, 2023 pm 08:28 PM
mysql docker canal

canal的介紹

canal的歷史由來

在早期的時候,阿里巴巴公司因為杭州和美國兩個地方的機房都部署了資料庫實例,但因為跨機房同步資料的業務需求,便孕育而生出了canal,主要是基於trigger(觸發器)的方式獲取增量變更。從2010年開始,阿里巴巴公司開始逐步嘗試資料庫日誌解析,取得增量變更的資料進行同步,由此衍生出了增量訂閱和消費性業務。

目前的canal支援的資料來源端mysql版本包括:5.1.x 、5.5.x 、5.6.x、5.7.x、8.0.x。

canal的應用程式場景

目前普遍基於日誌增量訂閱和消費的業務,主要包括:

  1. 基於資料庫增量日誌解析,提供增量資料訂閱和消費

  2. 資料庫鏡像資料庫即時備份

  3. 索引建置和即時維護(拆分異質索引、倒排索引等)

  4. 業務cache刷新

  5. 帶有業務邏輯的增量資料處理

  6. canal的工作原理

在介紹canal的原理之前,我們先來了解下mysql主從複製的原理。

mysql主從複製原理

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

  • #mysql master將資料變更的操作寫入二進位日誌binary log中, 其中記錄的內容叫做二進位日誌事件binary log events,可以透過show binlog events指令進行檢視

  • mysql slave會將master的binary log中的binary log events拷貝到它的中繼日誌relay log

  • mysql slave重讀並執行relay log中的事件,將資料變更對應到它自己的資料庫表中

#了解了mysql的工作原理,我們可以大致猜想到canal應該也是採用類似的邏輯去實現增量資料訂閱的功能,那麼接下來我們看看實際上canal的工作原理是怎樣的?

canal工作原理

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

  • #canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master發送dump協定

  • mysql master收到dump請求,開始推送binary log給slave(也就是canal)

  • canal解析binary log物件(資料為byte流)

基於這樣的原理與方式,便可以完成資料庫增量日誌的取得解析,提供增量資料訂閱和消費,實現mysql實時增量資料傳輸的功能。

既然canal是這樣的一個框架,又是純java語言寫而成,那麼我們接下來就開始學習怎麼使用它並把它用到我們的實際工作中。

canal的docker環境準備

因為目前容器化技術的火熱,本文透過使用docker來快速搭建開發環境,而傳統方式的環境搭建,在我們學會了docker容器環境搭建後,也能自行依葫蘆畫瓢搭建成功。由於這篇主要講解canal,所以關於docker的內容不會涉及太多,主要會介紹docker的基本概念和命令使用。如果你想和更多容器技術專家交流,可以加我微信liyingjiese,備註『加群』。群組每週都有全球各大公司的最佳實踐以及行業最新動態 。

什麼是docker

相信絕大多數人都使用過虛擬機器vmware,在使用vmware進行環境搭建的時候,只需提供了一個普通的系統鏡像並成功安裝,剩下的軟體環境與應用配置還是如我們在本機操作一樣在虛擬機裡也操作一遍,而且vmware佔用宿主機的資源較多,容易造成宿主機卡頓,而且系統鏡像本身也佔用過多空間。

為了方便大家快速理解docker,便與vmware做對比來做介紹,docker提供了一個開始,打包,運行app的平台,把app(應用)和底層infrastructure(基礎設施)隔離開來。 docker中最主要的兩個概念就是映像(類似vmware的系統映像)與容器(類似vmware裡安裝的系統)。

什麼是image(鏡像)

  • #檔案與meta data的集合(root filesystem)

  • 分層的,並且每一層都可以添加改變刪除文件,成為一個新的image

  • #不同的image可以共享相同的layer

  • #image本身是read-only的

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

#什麼是container(容器)

  • 透過image創建(copy)

  • 在image layer之上建立一個container layer(可讀寫)

  • 類比物件導向:類別與實例

  • #image負責app的儲存和分發,container負責運行app

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

docker的網路介紹

docker的網路類型有三種:

  • bridge:橋接網路。預設啟動的docker容器,都是使用bridge,docker安裝時建立的橋接網絡,每次docker容器重啟時,會按照順序取得對應的ip位址,這個就導致重啟下,docker的ip位址就變了。

  • none:無指定網路。使用 --network=none,docker容器就不會分配區域網路的ip。

  • host:主機網路。若使用--network=host,Docker容器將與主機共用網絡,二者可以互相通訊。當在一個容器中執行一個監聽8080埠的web服務時,容器會自動對應到主機的8080埠。

建立自訂網路:(設定固定ip)

docker network create --subnet=172.18.0.0/16 mynetwork
登入後複製

查看存在的網路類型docker network ls:

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

建置canal環境

附上docker的下載安裝位址==> docker download 。

下載canal映像docker pull canal/canal-server

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

下載mysql映像docker pull mysql# ,下載過的則如下圖:

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

查看已經下載好的映像docker images:

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

##接下來透過映像產生mysql容器與canal-server容器:

##生成mysql容器
docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e mysql_root_password=root mysql
##生成canal-server容器
docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server
## 命令介绍
--net mynetwork #使用自定义网络
--ip #指定分配ip
登入後複製

查看docker中執行的容器docker ps:

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

##mysql的設定修改

以上只是初步準備好了基礎的環境,但是要怎麼讓canal偽裝成salve並正確取得mysql中的binary log呢?

對於自建mysql,需要先開啟binlog寫入功能,配置

binlog-format

為row模式,透過修改mysql設定檔來開啟bin_log,使用find / -name my.cnf查找my.cnf,修改檔案內容如下:<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class='brush:php;toolbar:false;'>[mysqld] log-bin=mysql-bin # 开启binlog binlog-format=row # 选择row模式 server_id=1 # 配置mysql replaction需要定义,不要和canal的slaveid重复</pre><div class="contentsignin">登入後複製</div></div>進入mysql容器

docker exec -it mysql bash。

建立連結mysql的帳號canal並授予作為mysql slave的權限,如果已有帳戶可直接grant:

mysql -uroot -proot
# 创建账号
create user canal identified by &#39;canal&#39;; 
# 授予权限
grant select, replication slave, replication client on *.* to &#39;canal&#39;@&#39;%&#39;;
-- grant all privileges on *.* to &#39;canal&#39;@&#39;%&#39; ;
# 刷新并应用
flush privileges;
登入後複製

資料庫重新啟動後,簡單測試my.cnf 配置是否生效:

show variables like &#39;log_bin&#39;;
show variables like &#39;log_bin&#39;;
show master status;
登入後複製
基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

canal-server的設定修改

進入canal-server容器

docker exec -it canal-server bash

編輯canal-server的設定

vi canal-server/conf/example/instance.properties

##更多設定請參考==>canal基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能設定說明。

重啟canal-server容器docker restart canal-server

# 進入容器查看啟動日誌:

docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log
登入後複製

至此,我們的環境工作準備完成! 基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

拉取資料並同步儲存到elasticsearch

本文的elasticsearch也是基於docker環境搭建,所以讀者可執行如下指令:

# 下载对镜像
docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine
# 创建容器并运行
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1
docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
登入後複製

環境已經準備好了,現在就要開始我們的程式設計實戰部分了,怎麼透過應用程式去取得canal解析後的binlog資料。首先我們基於spring boot來搭建一個canal demo應用程式。架構如下圖所示:

student.java基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

package com.example.canal.study.pojo;
import lombok.data;
import java.io.serializable;
// @data 用户生产getter、setter方法
@data
public class student implements serializable {
private string id;
private string name;
private int age;
private string sex;
private string city;
}
登入後複製

canalconfig.java

package com.example.canal.study.common;
import com.alibaba.otter.canal.client.canalconnector;
import com.alibaba.otter.canal.client.canalconnectors;
import org.apache.http.httphost;
import org.elasticsearch.client.restclient;
import org.elasticsearch.client.resthighlevelclient;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import java.net.inetsocketaddress;
/**
* @author haha
*/
@configuration
public class canalconfig {
// @value 获取 application.properties配置中端内容
@value("${canal.server.ip}")
private string canalip;
@value("${canal.server.port}")
private integer canalport;
@value("${canal.destination}")
private string destination;
@value("${elasticsearch.server.ip}")
private string elasticsearchip;
@value("${elasticsearch.server.port}")
private integer elasticsearchport;
@value("${zookeeper.server.ip}")
private string zkserverip;
// 获取简单canal-server连接
@bean
public canalconnector canalsimpleconnector() {
 canalconnector canalconnector = canalconnectors.newsingleconnector(new inetsocketaddress(canalip, canalport), destination, "", "");
 return canalconnector;
}
// 通过连接zookeeper获取canal-server连接
@bean
public canalconnector canalhaconnector() {
 canalconnector canalconnector = canalconnectors.newclusterconnector(zkserverip, destination, "", "");
 return canalconnector;
}
// elasticsearch 7.x客户端
@bean
public resthighlevelclient resthighlevelclient() {
 resthighlevelclient client = new resthighlevelclient(
   restclient.builder(new httphost(elasticsearchip, elasticsearchport))
 );
 return client;
}
}
登入後複製

canaldataparser.java

由於這個類別的程式碼較多,文中則摘出其中比較重要的部分,其它部分程式碼可從github上取得:

public static class twotuple<a, b> {
 public final a eventtype;
 public final b columnmap;
 public twotuple(a a, b b) {
  eventtype = a;
  columnmap = b;
 }
}
public static list<twotuple<eventtype, map>> printentry(list<entry> entrys) {
 list<twotuple<eventtype, map>> rows = new arraylist<>();
 for (entry entry : entrys) {
  // binlog event的事件事件
  long executetime = entry.getheader().getexecutetime();
  // 当前应用获取到该binlog锁延迟的时间
  long delaytime = system.currenttimemillis() - executetime;
  date date = new date(entry.getheader().getexecutetime());
  simpledateformat simpledateformat = new simpledateformat("yyyy-mm-dd hh:mm:ss");
  // 当前的entry(binary log event)的条目类型属于事务
  if (entry.getentrytype() == entrytype.transactionbegin || entry.getentrytype() == entrytype.transactionend) {
   if (entry.getentrytype() == entrytype.transactionbegin) {
    transactionbegin begin = null;
    try {
     begin = transactionbegin.parsefrom(entry.getstorevalue());
    } catch (invalidprotocolbufferexception e) {
     throw new runtimeexception("parse event has an error , data:" + entry.tostring(), e);
    }
    // 打印事务头信息,执行的线程id,事务耗时
    logger.info(transaction_format,
      new object[]{entry.getheader().getlogfilename(),
        string.valueof(entry.getheader().getlogfileoffset()),
        string.valueof(entry.getheader().getexecutetime()),
        simpledateformat.format(date),
        entry.getheader().getgtid(),
        string.valueof(delaytime)});
    logger.info(" begin ----> thread id: {}", begin.getthreadid());
    printxainfo(begin.getpropslist());
   } else if (entry.getentrytype() == entrytype.transactionend) {
    transactionend end = null;
    try {
     end = transactionend.parsefrom(entry.getstorevalue());
    } catch (invalidprotocolbufferexception e) {
     throw new runtimeexception("parse event has an error , data:" + entry.tostring(), e);
    }
    // 打印事务提交信息,事务id
    logger.info("----------------\n");
    logger.info(" end ----> transaction id: {}", end.gettransactionid());
    printxainfo(end.getpropslist());
    logger.info(transaction_format,
      new object[]{entry.getheader().getlogfilename(),
        string.valueof(entry.getheader().getlogfileoffset()),
        string.valueof(entry.getheader().getexecutetime()), simpledateformat.format(date),
        entry.getheader().getgtid(), string.valueof(delaytime)});
   }
   continue;
  }
  // 当前entry(binary log event)的条目类型属于原始数据
  if (entry.getentrytype() == entrytype.rowdata) {
   rowchange rowchage = null;
   try {
    // 获取储存的内容
    rowchage = rowchange.parsefrom(entry.getstorevalue());
   } catch (exception e) {
    throw new runtimeexception("parse event has an error , data:" + entry.tostring(), e);
   }
   // 获取当前内容的事件类型
   eventtype eventtype = rowchage.geteventtype();
   logger.info(row_format,
     new object[]{entry.getheader().getlogfilename(),
       string.valueof(entry.getheader().getlogfileoffset()), entry.getheader().getschemaname(),
       entry.getheader().gettablename(), eventtype,
       string.valueof(entry.getheader().getexecutetime()), simpledateformat.format(date),
       entry.getheader().getgtid(), string.valueof(delaytime)});
   // 事件类型是query或数据定义语言ddl直接打印sql语句,跳出继续下一次循环
   if (eventtype == eventtype.query || rowchage.getisddl()) {
    logger.info(" sql ----> " + rowchage.getsql() + sep);
    continue;
   }
   printxainfo(rowchage.getpropslist());
   // 循环当前内容条目的具体数据
   for (rowdata rowdata : rowchage.getrowdataslist()) {
    list<canalentry.column> columns;
    // 事件类型是delete返回删除前的列内容,否则返回改变后列的内容
    if (eventtype == canalentry.eventtype.delete) {
     columns = rowdata.getbeforecolumnslist();
    } else {
     columns = rowdata.getaftercolumnslist();
    }
    hashmap<string, object> map = new hashmap<>(16);
    // 循环把列的name与value放入map中
    for (column column: columns){
     map.put(column.getname(), column.getvalue());
    }
    rows.add(new twotuple<>(eventtype, map));
   }
  }
 }
 return rows;
}
登入後複製

elasticutils.java

package com.example.canal.study.common;
import com.alibaba.fastjson.json;
import com.example.canal.study.pojo.student;
import lombok.extern.slf4j.slf4j;
import org.elasticsearch.client.resthighlevelclient;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
import org.elasticsearch.action.docwriterequest;
import org.elasticsearch.action.delete.deleterequest;
import org.elasticsearch.action.delete.deleteresponse;
import org.elasticsearch.action.get.getrequest;
import org.elasticsearch.action.get.getresponse;
import org.elasticsearch.action.index.indexrequest;
import org.elasticsearch.action.index.indexresponse;
import org.elasticsearch.action.update.updaterequest;
import org.elasticsearch.action.update.updateresponse;
import org.elasticsearch.client.requestoptions;
import org.elasticsearch.common.xcontent.xcontenttype;
import java.io.ioexception;
import java.util.map;
/**
* @author haha
*/
@slf4j
@component
public class elasticutils {
@autowired
private resthighlevelclient resthighlevelclient;
/**
 * 新增
 * @param student 
 * @param index 索引
 */
public void savees(student student, string index) {
 indexrequest indexrequest = new indexrequest(index)
   .id(student.getid())
   .source(json.tojsonstring(student), xcontenttype.json)
   .optype(docwriterequest.optype.create);
 try {
  indexresponse response = resthighlevelclient.index(indexrequest, requestoptions.default);
  log.info("保存数据至elasticsearch成功:{}", response.getid());
 } catch (ioexception e) {
  log.error("保存数据至elasticsearch失败: {}", e);
 }
}
/**
 * 查看
 * @param index 索引
 * @param id _id
 * @throws ioexception
 */
public void getes(string index, string id) throws ioexception {
 getrequest getrequest = new getrequest(index, id);
 getresponse response = resthighlevelclient.get(getrequest, requestoptions.default);
 map<string, object> fields = response.getsource();
 for (map.entry<string, object> entry : fields.entryset()) {
  system.out.println(entry.getkey() + ":" + entry.getvalue());
 }
}
/**
 * 更新
 * @param student
 * @param index 索引
 * @throws ioexception
 */
public void updatees(student student, string index) throws ioexception {
 updaterequest updaterequest = new updaterequest(index, student.getid());
 updaterequest.upsert(json.tojsonstring(student), xcontenttype.json);
 updateresponse response = resthighlevelclient.update(updaterequest, requestoptions.default);
 log.info("更新数据至elasticsearch成功:{}", response.getid());
}
/**
 * 根据id删除数据
 * @param index 索引
 * @param id _id
 * @throws ioexception
 */
public void deletees(string index, string id) throws ioexception {
 deleterequest deleterequest = new deleterequest(index, id);
 deleteresponse response = resthighlevelclient.delete(deleterequest, requestoptions.default);
 log.info("删除数据至elasticsearch成功:{}", response.getid());
}
}
登入後複製

binlogelasticsearch.java

package com.example.canal.study.action;
import com.alibaba.otter.canal.client.canalconnector;
import com.alibaba.otter.canal.protocol.canalentry;
import com.alibaba.otter.canal.protocol.message;
import com.example.canal.study.common.canaldataparser;
import com.example.canal.study.common.elasticutils;
import com.example.canal.study.pojo.student;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.stereotype.component;
import java.io.ioexception;
import java.util.list;
import java.util.map;
/**
* @author haha
*/
@slf4j
@component
public class binlogelasticsearch {
@autowired
private canalconnector canalsimpleconnector;
@autowired
private elasticutils elasticutils;
//@qualifier("canalhaconnector")使用名为canalhaconnector的bean
@autowired
@qualifier("canalhaconnector")
private canalconnector canalhaconnector;
public void binlogtoelasticsearch() throws ioexception {
 opencanalconnector(canalhaconnector);
 // 轮询拉取数据
 integer batchsize = 5 * 1024;
 while (true) {
  message message = canalhaconnector.getwithoutack(batchsize);
//   message message = canalsimpleconnector.getwithoutack(batchsize);
  long id = message.getid();
  int size = message.getentries().size();
  log.info("当前监控到binlog消息数量{}", size);
  if (id == -1 || size == 0) {
   try {
    // 等待2秒
    thread.sleep(2000);
   } catch (interruptedexception e) {
    e.printstacktrace();
   }
  } else {
   //1. 解析message对象
   list<canalentry.entry> entries = message.getentries();
   list<canaldataparser.twotuple<canalentry.eventtype, map>> rows = canaldataparser.printentry(entries);
   for (canaldataparser.twotuple<canalentry.eventtype, map> tuple : rows) {
    if(tuple.eventtype == canalentry.eventtype.insert) {
     student student = createstudent(tuple);
     // 2。将解析出的对象同步到elasticsearch中
     elasticutils.savees(student, "student_index");
     // 3.消息确认已处理
//     canalsimpleconnector.ack(id);
     canalhaconnector.ack(id);
    }
    if(tuple.eventtype == canalentry.eventtype.update){
     student student = createstudent(tuple);
     elasticutils.updatees(student, "student_index");
     // 3.消息确认已处理
//     canalsimpleconnector.ack(id);
     canalhaconnector.ack(id);
    }
    if(tuple.eventtype == canalentry.eventtype.delete){
     elasticutils.deletees("student_index", tuple.columnmap.get("id").tostring());
     canalhaconnector.ack(id);
    }
   }
  }
 }
}
/**
 * 封装数据至student
 * @param tuple
 * @return
 */
private student createstudent(canaldataparser.twotuple<canalentry.eventtype, map> tuple){
 student student = new student();
 student.setid(tuple.columnmap.get("id").tostring());
 student.setage(integer.parseint(tuple.columnmap.get("age").tostring()));
 student.setname(tuple.columnmap.get("name").tostring());
 student.setsex(tuple.columnmap.get("sex").tostring());
 student.setcity(tuple.columnmap.get("city").tostring());
 return student;
}
/**
 * 打开canal连接
 *
 * @param canalconnector
 */
private void opencanalconnector(canalconnector canalconnector) {
 //连接canalserver
 canalconnector.connect();
 // 订阅destination
 canalconnector.subscribe();
}
/**
 * 关闭canal连接
 *
 * @param canalconnector
 */
private void closecanalconnector(canalconnector canalconnector) {
 //关闭连接canalserver
 canalconnector.disconnect();
 // 注销订阅destination
 canalconnector.unsubscribe();
}
}
登入後複製

canaldemoapplication.java(spring boot启动类)

package com.example.canal.study;
import com.example.canal.study.action.binlogelasticsearch;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.applicationarguments;
import org.springframework.boot.applicationrunner;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
/**
* @author haha
*/
@springbootapplication
public class canaldemoapplication implements applicationrunner {
@autowired
private binlogelasticsearch binlogelasticsearch;
public static void main(string[] args) {
 springapplication.run(canaldemoapplication.class, args);
}
// 程序启动则执行run方法
@override
public void run(applicationarguments args) throws exception {
 binlogelasticsearch.binlogtoelasticsearch();
}
}
登入後複製

application.properties

server.port=8081
spring.application.name = canal-demo
canal.server.ip = 192.168.124.5
canal.server.port = 11111
canal.destination = example
zookeeper.server.ip = 192.168.124.5:2181
zookeeper.sasl.client = false
elasticsearch.server.ip = 192.168.124.5
elasticsearch.server.port = 9200
登入後複製

canal集群高可用的搭建

通过上面的学习,我们知道了单机直连方式的canala应用。在当今互联网时代,单实例模式逐渐被集群高可用模式取代,那么canala的多实例集群方式如何搭建呢!

基于zookeeper获取canal实例

准备zookeeper的docker镜像与容器:

docker pull zookeeper
docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper
docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server
登入後複製

1、机器准备:

  • 运行canal的容器ip: 172.18.0.4 , 172.18.0.8

  • zookeeper容器ip:172.18.0.3:2181

  • mysql容器ip:172.18.0.6:3306

2、按照部署和配置,在单台机器上各自完成配置,演示时instance name为example。

3、修改canal.properties,加上zookeeper配置并修改canal端口:

canal.port=11113
canal.zkservers=172.18.0.3:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
登入後複製

4、创建example目录,并修改instance.properties:

canal.instance.mysql.slaveid = 1235 
#之前的canal slaveid是1234,保证slaveid不重复即可
canal.instance.master.address = 172.18.0.6:3306
登入後複製

注意: 两台机器上的instance目录的名字需要保证完全一致,ha模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置。

启动两个不同容器的canal,启动后,可以通过tail -100f logs/example/example.log查看启动日志,只会看到一台机器上出现了启动成功的日志。

比如我这里启动成功的是 172.18.0.4:

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

查看一下zookeeper中的节点信息,也可以知道当前工作的节点为172.18.0.4:11111:

[zk: localhost:2181(connected) 15] get /otter/canal/destinations/example/running 
{"active":true,"address":"172.18.0.4:11111","cid":1}
登入後複製

客户端链接, 消费数据

可以通过指定zookeeper地址和canal的instance name,canal client会自动从zookeeper中的running节点获取当前服务的工作节点,然后与其建立链接:

[zk: localhost:2181(connected) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.4:11111","cid":1}
登入後複製

对应的客户端编码可以使用如下形式,上文中的canalconfig.java中的canalhaconnector就是一个ha连接:

canalconnector connector = canalconnectors.newclusterconnector("172.18.0.3:2181", "example", "", "");
登入後複製

链接成功后,canal server会记录当前正在工作的canal client信息,比如客户端ip,链接的端口信息等(聪明的你,应该也可以发现,canal client也可以支持ha功能):

[zk: localhost:2181(connected) 4] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.124.5:59887","clientid":1001}
登入後複製

数据消费成功后,canal server会在zookeeper中记录下当前最后一次消费成功的binlog位点(下次你重启client时,会从这最后一个位点继续进行消费):

[zk: localhost:2181(connected) 5] get /otter/canal/destinations/example/1001/cursor

{"@type":"com.alibaba.otter.canal.protocol.position.logposition","identity":{"slaveid":-1,"sourceaddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalname":"binlog.000004","position":2169,"timestamp":1562672817000}}
登入後複製

停止正在工作的172.18.0.4的canal server:

docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh
登入後複製

这时172.18.0.8会立马启动example instance,提供新的数据服务:

[zk: localhost:2181(connected) 19] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.8:11111","cid":1}
登入後複製

与此同时,客户端也会随着canal server的切换,通过获取zookeeper中的最新地址,与新的canal server建立链接,继续消费数据,整个过程自动完成。

异常与总结

elasticsearch-head无法访问elasticsearch

es与es-head是两个独立的进程,当es-head访问es服务时,会存在一个跨域问题。所以我们需要修改es的配置文件,增加一些配置项来解决这个问题,如下:

[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/
[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml 
# 文件末尾加上如下配置
http.cors.enabled: true
http.cors.allow-origin: "*"
登入後複製

修改完配置文件后需重启es服务。

elasticsearch-head查询报406 not acceptable

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

解决方法:

1、进入head安装目录;

2、cd _site/

3、编辑vendor.js 共有两处

#6886行 contenttype: "application/x-www-form-urlencoded
改成 contenttype: "application/json;charset=utf-8"
 #7574行 var inspectdata = s.contenttype === "application/x-www-form-urlencoded" &&
改成 var inspectdata = s.contenttype === "application/json;charset=utf-8" &&
登入後複製

使用elasticsearch-rest-high-level-clientorg.elasticsearch.action.index.indexrequest.ifseqno

#pom中除了加入依赖
<dependency>
<groupid>org.elasticsearch.client</groupid>
<artifactid>elasticsearch-rest-high-level-client</artifactid>
<version>7.1.1</version>
</dependency>
#还需加入
<dependency>
<groupid>org.elasticsearch</groupid>
<artifactid>elasticsearch</artifactid>
<version>7.1.1</version>
</dependency>
登入後複製

相关参考: 。

为什么elasticsearch要在7.x版本不能使用type?

参考: 为什么elasticsearch要在7.x版本去掉type?

使用spring-data-elasticsearch.jar报org.elasticsearch.client.transport.nonodeavailableexception

由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底层采用es官方transportclient,而es官方计划放弃transportclient,工具以es官方推荐的resthighlevelclient进行调用请求。 可参考 resthighlevelclient api 。

设置docker容器开启启动

如果创建时未指定 --restart=always ,可通过update 命令
docker update --restart=always [containerid]
登入後複製

docker for mac network host模式不生效

host模式是为了性能,但是这却对docker的隔离性造成了破坏,导致安全性降低。 在性能场景下,可以用--netwokr host开启host模式,但需要注意的是,如果你用windows或mac本地启动容器的话,会遇到host模式失效的问题。原因是host模式只支持linux宿主机。

参见官方文档:    。

客户端连接zookeeper报authenticate using sasl(unknow error)

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

  • zookeeper.jar與dokcer中的zookeeper版本不一致

  • zookeeper.jar使用了3.4.6之前的版本

出現這個錯的意思是zookeeper作為外部應用程式需要向系統申請資源,申請資源的時候需要通過認證,而sasl是一種認證方式,我們想辦法來繞過sasl認證。避免等待,來提高效率。

在專案程式碼中加入system.setproperty("zookeeper.sasl.client", "false");,如果是spring boot專案可以在application.properties中加入zookeeper.sasl.client=false

參考: increased cpu usage by unnecessary sasl checks 。

如果更換canal.client.jar中依賴的zookeeper.jar的版本

把canal的官方原始碼下載到本機git clone    ,然後修改client模組下pom.xml檔案中關於zookeeper的內容,然後重新mvn install:

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

把自己專案依賴的套件替換為剛剛mvn install生產的套件:

基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能

以上是基於Docker與Canal怎麼實現MySQL即時增量資料傳輸功能的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您聽不到任何人,如何修復音頻
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解鎖Myrise中的所有內容
3 週前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

mysql用戶和數據庫的關係 mysql用戶和數據庫的關係 Apr 08, 2025 pm 07:15 PM

MySQL 數據庫中,用戶和數據庫的關係通過權限和表定義。用戶擁有用戶名和密碼,用於訪問數據庫。權限通過 GRANT 命令授予,而表由 CREATE TABLE 命令創建。要建立用戶和數據庫之間的關係,需創建數據庫、創建用戶,然後授予權限。

RDS MySQL 與 Redshift 零 ETL 集成 RDS MySQL 與 Redshift 零 ETL 集成 Apr 08, 2025 pm 07:06 PM

數據集成簡化:AmazonRDSMySQL與Redshift的零ETL集成高效的數據集成是數據驅動型組織的核心。傳統的ETL(提取、轉換、加載)流程複雜且耗時,尤其是在將數據庫(例如AmazonRDSMySQL)與數據倉庫(例如Redshift)集成時。然而,AWS提供的零ETL集成方案徹底改變了這一現狀,為從RDSMySQL到Redshift的數據遷移提供了簡化、近乎實時的解決方案。本文將深入探討RDSMySQL零ETL與Redshift集成,闡述其工作原理以及為數據工程師和開發者帶來的優勢。

mysql用戶名和密碼怎麼填 mysql用戶名和密碼怎麼填 Apr 08, 2025 pm 07:09 PM

要填寫 MySQL 用戶名和密碼,請:1. 確定用戶名和密碼;2. 連接到數據庫;3. 使用用戶名和密碼執行查詢和命令。

MySQL 中的查詢優化對於提高數據庫性能至關重要,尤其是在處理大型數據集時 MySQL 中的查詢優化對於提高數據庫性能至關重要,尤其是在處理大型數據集時 Apr 08, 2025 pm 07:12 PM

1.使用正確的索引索引通過減少掃描的數據量來加速數據檢索select*fromemployeeswherelast_name='smith';如果多次查詢表的某一列,則為該列創建索引如果您或您的應用根據條件需要來自多個列的數據,則創建複合索引2.避免選擇*僅選擇那些需要的列,如果您選擇所有不需要的列,這只會消耗更多的服務器內存並導致服務器在高負載或頻率時間下變慢例如,您的表包含諸如created_at和updated_at以及時間戳之類的列,然後避免選擇*,因為它們在正常情況下不需要低效查詢se

如何針對高負載應用程序優化 MySQL 性能? 如何針對高負載應用程序優化 MySQL 性能? Apr 08, 2025 pm 06:03 PM

MySQL數據庫性能優化指南在資源密集型應用中,MySQL數據庫扮演著至關重要的角色,負責管理海量事務。然而,隨著應用規模的擴大,數據庫性能瓶頸往往成為製約因素。本文將探討一系列行之有效的MySQL性能優化策略,確保您的應用在高負載下依然保持高效響應。我們將結合實際案例,深入講解索引、查詢優化、數據庫設計以及緩存等關鍵技術。 1.數據庫架構設計優化合理的數據庫架構是MySQL性能優化的基石。以下是一些核心原則:選擇合適的數據類型選擇最小的、符合需求的數據類型,既能節省存儲空間,又能提升數據處理速度

mysql怎麼複製粘貼 mysql怎麼複製粘貼 Apr 08, 2025 pm 07:18 PM

MySQL 中的複制粘貼包含以下步驟:選擇數據,使用 Ctrl C(Windows)或 Cmd C(Mac)複製;在目標位置右鍵單擊,選擇“粘貼”或使用 Ctrl V(Windows)或 Cmd V(Mac);複製的數據將插入到目標位置,或替換現有數據(取決於目標位置是否已存在數據)。

了解 ACID 屬性:可靠數據庫的支柱 了解 ACID 屬性:可靠數據庫的支柱 Apr 08, 2025 pm 06:33 PM

數據庫ACID屬性詳解ACID屬性是確保數據庫事務可靠性和一致性的一組規則。它們規定了數據庫系統處理事務的方式,即使在系統崩潰、電源中斷或多用戶並發訪問的情況下,也能保證數據的完整性和準確性。 ACID屬性概述原子性(Atomicity):事務被視為一個不可分割的單元。任何部分失敗,整個事務回滾,數據庫不保留任何更改。例如,銀行轉賬,如果從一個賬戶扣款但未向另一個賬戶加款,則整個操作撤銷。 begintransaction;updateaccountssetbalance=balance-100wh

mysql怎麼查看 mysql怎麼查看 Apr 08, 2025 pm 07:21 PM

通過以下命令查看 MySQL 數據庫:連接到服務器:mysql -u 用戶名 -p 密碼運行 SHOW DATABASES; 命令獲取所有現有數據庫選擇數據庫:USE 數據庫名;查看表:SHOW TABLES;查看表結構:DESCRIBE 表名;查看數據:SELECT * FROM 表名;

See all articles