springboot+canal+mysql+redis缓存双写一致性

2023-09-18 14:26:42

canal官网地址:https://github.com/alibaba/canal/wiki/QuickStart
基本上按照官网的步骤来就行
在这里插入图片描述

准备

首先服务器上要安装好jdk,因为canal运行需要jdk,同时把canal对应的端口在服务中开放,否则连接不上

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

查看是否开启binlog日志功能

SHOW VARIABLES LIKE 'log_bin';

如果为OFF说明没有开启

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

注意点,这里执行命令最好是在服务器端执行,因为我们通过连接客户端登录的自己创建的后台用户比如root ,没有授权权限,会出错
出现以下情况
在这里插入图片描述

DROP USER IF EXISTS 'canal'@'%'; #删除用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';   ## 创建用户
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';  
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;

启动

下载 canal, 访问 release 页面 , 选择需要的包下载, 我这里下载最新的

https://github.com/alibaba/canal/releases/tag/canal-1.1.6

解压缩

上传服务器选择一个位置,并解压指定的文件夹下面

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

解压之后

解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs

配置修改

vi conf/example/instance.properties
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

## mysql serverId
canal.instance.mysql.slaveId = 1234 #默认是注释的,
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  #canal默认用户名
canal.instance.dbPassword = canal  #canal默认密码 
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*  #监听的表,默认监听所有表

启动

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

在这里插入图片描述

查看 instance 的日志

vi logs/example/example.log

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

DEMO

依赖

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>


        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.3.10</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

    

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

       

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 导入mysql驱动包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.11</version>
        </dependency>
        <!-- 导入数据源依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.9</version>
        </dependency>

     


        <!--canal-->


        <dependency>
        <groupId>top.javatool</groupId>
        <artifactId>canal-spring-boot-starter</artifactId>
        <version>1.2.1-RELEASE</version>

        </dependency>




    </dependencies>

代码

配置文件
server:
  port: 8081
spring:
  datasource:
    url: jdbc:mysql://42.192.46.136:3306/test?useUnicode=true&useSSL=false&characterEncoding=utf-8
    username: root
    password: lkzroot
    type: com.alibaba.druid.pool.DruidDataSource
  redis:
    password: 123456
    host: 42.192.43.136
    port: 16379
    lettuce:
      pool:
        max-active: 200
        max-idle: 10
        max-wait: -1ms
        min-idle: 3
测试类
package com.boot.canal.client;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @Author: lkz
 * @Title: RedisCanalClientExample
 * @Description: TODO
 * @Date: 2023/9/18 14:06
 */

public class RedisCanalClientExample {


    public static final Integer _60SECONDS = 60;
    public static final String  Canal_host_ip = "127.0.0.1";

    private static void redisInsert(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisDelete(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.del(columns.get(0).getValue());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisUpdate(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
            }
            //获取变动类型
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == CanalEntry.EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }


    public static void main(String[] args)
    {
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");

        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(Canal_host_ip, 11111),
                "example",
                "",
                "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            connector.connect();
            //connector.subscribe(".*\\..*");
            connector.subscribe("test.test1");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }


}

redis配置
package com.boot.canal.client;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


public class RedisUtils
{
    public static final String  REDIS_IP_ADDR = "127.0.0.1";
    public static final String  REDIS_pwd = "123456";
    public static JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(20);
        jedisPoolConfig.setMaxIdle(10);
        jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,16379,10000,REDIS_pwd);
    }

    public static Jedis getJedis() throws Exception {
        if(null!=jedisPool){
            return jedisPool.getResource();
        }
        throw new Exception("Jedispool is not ok");
    }

}

结果

修改表数据

在这里插入图片描述

更多推荐

探讨大型公共建筑能耗监测与信息管理系统研究及应用

安科瑞华楠摘要:文章通过阐述大型公共建筑能耗现状,突出大型公共建筑实施节能监管的必要性,并在系统总结运用技术手段实施建筑能耗监测的基础上,介绍了江苏省建筑能耗监测系统研究过程中的技术创新和应用情况。关键词:公共建筑;建筑能耗;监测;节能0前言为随着我国工业化和城市化进程的加速,高速发展的建筑业以及不断扩大的建筑规模使得

短视频视频制作矩阵剪辑系统---源码,源代码独立搭建

短视频矩阵系统源码开发源代码搭建1,短视频矩阵系统需要的技术和知识1..数据库技术:抖音SEO系统需要处理海量的数据,需要使用高效、安全的数据库技术,如MySQL、MongoDB等,进行数据存储和管理。2.分布式系统架构:抖音SEO系统需要具备高可靠性、高性能、高容错性等特点,需要使用分布式系统架构进行设计和搭建。3.

【C++】C++11——可变参数模板和emplace

可变参数模板的定义方式可变参数模板的传值计算可变参数模板参数个数参数包展开方式递归展开参数包逗号表达式展开参数包emplace插入可变参数模板是C++11新增的最强大的特性之一,它对参数高度泛化,能够让我们创建可以接受可变参数的函数模板和类模板。在C++11之前,类模板和函数模板中只能包含固定数量的模板参数,可变模板参

创龙TL6678F开发板: 实现FPGA与DSP之间 SRIO(3.125Gbps, 4x)通信

创龙TL6678F开发板官方Demo:SRIO_AD9613实现了FPGA和DSP之间的SRIO通信,SRIO的速率为5Gbps.在FPGA端,srio_gen_2模块的参考时钟为125MHz.而Demo:udp_10g_echo实现了10G以太网通信,ten_gig_eth_pcs_pma模块的参考时钟为156.25

软件测试/测试开发丨​利用ChatGPT编写测试用例

点此获取更多相关资料简介测试用例是测试人员的核心工作内容,是测试人员思想的“实现类”,其充分体现了测试的思路,可以为后续的测试行为提供指导,是测试人员了解业务的重要根据和质量之根本。如果测试用例设计得不完成,出现了遗漏,那么通常是会出现大家不想看到的后果,如漏测、线上Bug不断等。——引用自《饿了么质量体系搭建实战》一

入职环境安装经验

Java程序员跳槽新入职一家新的公司,不可避免的第一天要开始给自己的Mac安装环境;有些刚从培训机构出来的小白就不清楚要安装哪些,安装的过程总会出现这样那样的问题。这里根据本人6年的开发生涯和几次跳槽的经验。总结出一套共用的装机策略。1.ideaorEclipse这里推荐idea,现在破解版的码貌似不太好搞,先装一个试

Spring Boot 版本 GA、RC、beta等含义

GAGeneralAvailability,正式发布的版本,官方开始推荐广泛使用,国外有的用GA来表示release版本。RELEASE正式发布版,官方推荐使用的版本,有的用GA来表示。比如spring。Stable稳定版,开源软件有的会用stable来表示正式发布的版本。比如Nginx。Final最终版,也是正式发布

神领物流 day01-项目概述 Spring Cloud Alibaba 微服务

课程安排了解神领物流了解物流行业了解项目的技术架构了解项目的业务功能项目功能演示搭建开发环境基于现有代码进行bug修复阅读已有的代码1、场景说明现在的你,已经学习了目前最主流的系统架构技术《微服务技术栈》,并且呢也拿到了满意的offer,入职了一家物流公司,公司名叫:神领物流公司。现在你的心情还是比较复杂的,既开心又担

腾讯K线修复运营版 股票配资系统安装搭建 二次开发 新接口

实盘接口心跳publicstaticfunctionheart(){$heart=newHeart();$heart->heart();return;}预警线=配资金额+保证金*比例先搜索配资表(条件操盘中)搜子账号ID去持仓表查询股票数量如何数量是0不继续做判断搜到的股票数量用z_market_bat函数,查询股票返

Nacos身份绕过漏洞复现(QVD-2023-6271)

一、背景nacos安全预警,对问题复现后进行修复。漏洞原理为开源服务管理平台Nacos在默认配置下未对token.secret.key进行修改,导致远程攻击者可以绕过密钥认证进入后台造成系统受控等后果。漏洞信息漏洞类型:身份认证绕过漏洞等级:高危漏洞编号:NVDB-CNVDB-2023674205漏洞影响范围:0.1.

【QT--使用百度地图API显示地图并绘制路线】

QT--使用百度地图API显示地图并绘制路线前言准备工作申请百度地图密钥(AK)安装开发环境开发过程新建项目ui界面GPSManager类主窗口Map效果展示前言先吐槽一下下,本身qt学的就不咋滴,谁想到第一件事就是让写一个上位机工具,根据CAN总线传来的位置信息,在地图上去绘制路线,并获取当前路段的限速信息等。当听到

热文推荐