Canal 实现MySQL与Elasticsearch7数据同步

2023-09-21 11:49:41

1 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

优点: 可以完全和业务代码解耦,增量日志订阅。

缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。

2 canal实现MySQL与Elasticsearch7数据同步

下面介绍下利用canal ,canal adapter实现MySQL与ES7数据同步

2.1 Mysql配置修改

在MySQL中需要创建一个用户,并授权:

-- 使用命令登录:mysql -u root -p 
-- 创建用户 用户名:canal  
create user 'canal'@'%' identified by 'canal'; 
-- 授权 .表示所有库 
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on . to 'canal'@'%' identified by 'canal';

下一步在MySQL配置文件my.cnf设置如下信息:

[mysqld] 
# 开启binlog 
log-bin=mysql-bin 
# 选择ROW(行)模式 
binlog-format=ROW 
# 配置MySQL replaction需要定义,不要和canal的slaveId重复 
server_id=1

改了配置文件之后,重启MySQL

2.2 下载Canal

下载最新的cana1.1.5并解压,1.1.5 才支持Elasticsearch7
下载地址:
canal.adapter-1.1.5-SNAPSHOT.tar.gz(适配器)
canal.deployer-1.1.5-SNAPSHOT.tar.gz(服务端)
在这里插入图片描述
canal.adapter为适配端,canal.deployer为服务端

2.3 启动Canal服务端

2.3.1 修改数据库配置

进入conf/example目录下,修改instance.properties为数据库配置
在这里插入图片描述

在这里插入图片描述

2.3.2 启动服务端

进入bin目录,双击starup.bat启动,出现下面界面,表明启动成功
在这里插入图片描述
在这里插入图片描述
服务端启动成功, 接下来进入客户端测试

2.4 启动Canal客户端

2.4.1 修改配置

进入adapter目录,修改application.yml
在这里插入图片描述
yml内容:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: rocketMQ #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

#  srcDataSources:
#    defaultDS:
#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
#      username: root
#      password: 121212
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

2.4.2 创建索引, 根据同步sql数据至Elasticsearch中

调用http://127.0.0.1:9200/product(PUT请求)创建索引,product为索引名字

{
  "mappings" : {
    "properties" : {
      "attrs" : {
        "type" : "nested",
        "properties" : {
          "attrId" : {
            "type" : "long"
          },
          "attrName" : {
            "type" : "keyword"
          },
          "attrValueId" : {
            "type" : "long"
          },
          "attrValueName" : {
            "type" : "keyword"
          }
        }
      },
      "tags" : {
        "type" : "nested",
        "properties" : {
          "tagId" : {
            "type" : "long"
          },
          "seq" : {
            "type" : "integer"
          }
        }
      },
      "brandId" : {
        "type" : "long"
      },
      "brandImg" : {
        "type" : "keyword"
      },
      "brandName" : {
        "type" : "keyword"
      },
      "code" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "commentNum" : {
        "type" : "integer"
      },
      "createTime" : {
        "type" : "date"
      },
      "hasStock" : {
        "type" : "boolean"
      },
      "imgUrls" : {
        "type" : "keyword",
        "index" : false,
        "doc_values" : false
      },
      "mainImgUrl" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      },
      "marketPriceFee" : {
        "type" : "long"
      },
      "priceFee" : {
        "type" : "long"
      },
      "saleNum" : {
        "type" : "integer"
      },
      "sellingPoint" : {
        "type" : "text",
        "analyzer" : "ik_max_word",
        "search_analyzer" : "ik_smart"
      },
      "shopId" : {
        "type" : "long"
      },
      "shopImg" : {
        "type" : "keyword",
        "index" : false,
        "doc_values" : false
      },
      "shopName" : {
        "type" : "text",
        "analyzer" : "ik_max_word",
        "search_analyzer" : "ik_smart"
      },
      "shopType" : {
        "type" : "integer"
      },
      "shopPrimaryCategoryId" : {
        "type" : "long"
      },
      "shopPrimaryCategoryName" : {
        "type" : "keyword"
      },
      "shopSecondaryCategoryId" : {
        "type" : "long"
      },
      "shopSecondaryCategoryName" : {
        "type" : "keyword"
      },
      "primaryCategoryId" : {
        "type" : "long"
      },
      "primaryCategoryName" : {
        "type" : "keyword"
      },
      "secondaryCategoryId" : {
        "type" : "long"
      },
      "secondaryCategoryName" : {
        "type" : "keyword"
      },
      "categoryId" : {
        "type" : "long"
      },
      "categoryName" : {
        "type" : "keyword"
      },
      "spuId" : {
        "type" : "long"
      },
      "spuName" : {
        "type" : "text",
        "analyzer" : "ik_max_word",
        "search_analyzer" : "ik_smart"
      },
      "spuStatus" : {
        "type" : "integer"
      },
      "success" : {
        "type" : "boolean"
      }
    }
  }
}

在这里插入图片描述

2.4.3 启动客户端

进入\canal.adapter-1.1.5-SNAPSHOT\bin 双击startup.bat 出现下面界面,表明启动成功
在这里插入图片描述

3 注意事项

3.1 使用队列

2.4.1中客户端配置需要配置队列,我使用的rocketmq,大家可以根据自己需要选择队列,根据选择的队列,需改配置

3.2 启动顺序

需要优先启动Mysql和Elasticsearch,然后启动Canal服务端,最后启动客户端

4 验证

当你在操作界面添加一个商品时,那么此时Elasticsearch也会同步这条数据,具体不展开来了

更多推荐

kafkaStream实时流式计算

2实时流式计算2.1概念一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高

C++零基础教程(函数重载)

文章目录前言一、概念讲解二、代码示例三、函数指针遇到函数重载总结前言本篇文章来讲解函数重载,函数重载在C++中是非常重要的一个概念。一、概念讲解C++中的函数重载是指在同一个作用域中定义多个具有相同名称但参数列表不同的函数。函数重载允许使用相同的函数名来表示执行类似但具有不同参数类型或参数数量的操作。这样做可以提高代码

谷歌邀请企业测试人工智能工具Gemini;提升(LLM)性能的三种简单方法

🦉AI新闻🚀谷歌邀请企业测试人工智能工具Gemini,与微软展开竞争摘要:谷歌邀请企业测试其人工智能工具Gemini,该工具结合了GPT-4的文本生成能力,支持聊天对话、分析图表数据、创建图像以及用自然语言命令控制软件等功能。谷歌希望通过Gemini来为其消费类人工智能产品提供支持,同时与微软Azure等竞品展开正

Vue之路由及Node.js环境搭建(一起探索新事物)

目录​编辑前言一、Vue之路由1.路由简介1.1什么是路由1.2什么是SPA1.3SPA的实现思路1.4使用路由的优势2.案例演示2.1导入所需的js文件2.2编写案例代码(模拟页面跳转)二、Vue之node.js1.node.js简介1.1什么是node.js1.2node.js的特点1.3什么是npm1.4npm的

79、SpringBoot 整合 R2DBC --- R2DBC 就是 JDBC 的 反应式版本, R2DBC 是 JDBC 的升级版。

★何谓R2DBCR2DBC就是JDBC的反应式版本,R2DBC是JDBC的升级版。R2DBC是ReactiveRelationalDatabaseConnectivity(关系型数据库的响应式连接)的缩写反应式的就是类似于消息发布者和订阅者,有消息就进行推送。R2DBC中DAO接口中方法的返回值是Flux或Mono因此

嵌入式笔试面试刷题(day5 IIC详解)

文章目录前言一、IIC需要几根线分别是什么线二、IIC优势三、IIC可以挂载多少个从设备,主设备1.从设备数量2.主设备数量四、IIC是全双工还是半双工五、SDA和SCL为什么配置为上拉开漏输出模式1.为什么要配置为开漏输出不能是推挽输出a.实现线与功能b.保护设备不会被短路2.上拉电阻的作用a.确保空闲状态保持高电平

Java synchronized关键字常见面试题

1、什么是线程同步,为什么需要线程同步?线程同步是一种机制,用于控制多个线程对共享资源的访问,以防止并发问题。它需要确保在同一时刻只有一个线程可以访问共享资源,以避免数据竞争和不一致性。2、请解释Java中synchronized关键字的作用和用法。synchronized关键字用于实现线程同步。它可以应用于方法或代码

0021Java程序设计-SSM框架图书管理系统

文章目录摘要目录系统设计开发环境摘要伴随着时代的进步,以及科学技术的不断发展,越来越多的人会从图书馆中借阅书籍,从而获得新的知识。但是,传统的图书管理方法有着很多不便之处,而且还容易丢失,因此,很有必要设计出一个易于操作,具有高便捷性,还可以提高工作效率的图书管理系统。该系统采用了Java技术,采用了计算机技术,取代了

python经典百题之请问他多少岁

题目:有5个人坐在一起,问第五个人多少岁?他说比第4个人大2岁。问第4个人岁数,他说比第3个人大2岁。问第三个人,又说比第2人大两岁。问第2个人,说比第一个人大两岁。最后问第一个人,他说是10岁。请问第五个人多大?方法1:迭代deffind_fifth_person_age_iterative():age=10#第一个

Multitor:一款带有负载均衡功能的多Tor实例创建工具

关于MultitorMultitor是一款带有负载均衡功能的多Tor实例创建工具,Multitor的主要目的是以最快的速度完成大量Tor进程的初始化,并将大量实例应用到我们日常使用的程序中,例如Web浏览器和聊天工具等等。除此之外,在该工具的帮助下,我们还可以在进行渗透测试和对基础设施安全性进行审计时提高隐蔽性和匿名性

4、wireshark使用教程

文章目录一、wireshark简介二、环境三、wireshark抓包三、wireshark过滤器使用一、wireshark简介Wireshark是使用最广泛的一款「开源抓包软件」,常用来检测网络问题、攻击溯源、或者分析底层通信机制。Wireshark抓包原理:单机情况:电脑直连互联网的单机环境。Wireshark直接抓

热文推荐