你好,欢迎进入江苏优软数字科技有限公司官网!

诚信、勤奋、创新、卓越

友好定价、专业客服支持、正版软件一站式服务提供

13262879759

工作日:9:00-22:00

源码分析Canal系列开启!探讨实现原理与阅读技巧,应用场景超实用

发布时间:2025-06-10

浏览次数:0

[[]92[id_1[]]]

[]

Canal 系列的源码剖析活动已经启动,这是一个全新的系列。它不仅深入探讨了 canal 的实现机制,同时也展现了作者在源码阅读方面的技巧。

1、应用场景

提及Canal,众人多半会联想到它是一种专门用于分析MySQL日志的软件,同时还能将MySQL数据库中的信息同步至其他存储媒介,比如。

即,Canal 在实际应用中非常普遍的一个例子:涉及数据异构,这是一种更高级别的数据读写分离的架构设计方案。

企业业务持续增长,达到一定规模后,意识到独立的关系型数据库已不足以应对数据量激增带来的挑战,于是产生了新的架构设计——分库分表。这种设计确实能有效减轻单一数据库的压力,然而,它也带来新的问题,尤其是在关联查询方面,情况更为严重,跨库联合查询更是如此。

在订单系统中,一般存在两种用户群体需要查询订单信息,即消费者和商家。在数据库进行分库分表的操作中,若以消费者为依据进行分库,则同一家商家的订单数据将被分散存储在不同的数据库中;反之,若以商家为标准进行分库,则同一消费者购买的所有订单数据也会分散在不同的数据库中。如此一来,进行关联查询时,不可避免地需要跨库进行数据连接操作,这无疑会增加较高的成本。而且上面的场景只能满足一方的需求,那如何是好呢?

此时,Canal崭露头角,在电商设计领域,商家与顾客实际上被划分为两个独立的服务对象。我们能够为这两类服务分别构建不同的数据库集群。具体来说,我们可以对用户订单库和商家订单库进行分库处理,其中用户订单库作为主库。当用户在订单系统中完成下单操作后,相关数据便被存入用户订单库。随后,通过Canal监听数据库的日志,数据得以同步至商家订单库。此外,用户订单库按照用户ID进行分库,而商家订单库则依据商家ID进行分库,从而完美解决了这一问题。

2、架构设计原理

在充分掌握 Canal 的主要应用领域之后,我们借助 canal 的官方资料,深入剖析其核心架构的设计思想,进而揭开 Canal 深藏不露的奥秘所在。

首先我们简单看一下 MySQL 的主从同步原理:

intellij idea 数据库关系图_Canal MySQL binlog解析_数据同步架构设计

在这里插入图片描述

从上面的图中可以看成主从复制主要分成三个步骤:

依托于MySQL这一数据同步架构,Canal的设计宗旨主要集中于实现数据的同步功能,也就是数据的复制功能,观察上述图表后,自然而然地联想到了以下的设计方案:

数据同步架构设计_intellij idea 数据库关系图_Canal MySQL binlog解析

原理相对比较简单:

接下来我们来看一下 的整体组成部分:

intellij idea 数据库关系图_数据同步架构设计_Canal MySQL binlog解析

说明:

模块:

这些部件我暂未计划深入探究,因为就目前情况来看,我自己对它们也尚无明确认识,然而,这些正是我将来需要着重学习和研究的领域。

3、在 IDEA 中运行 Demo

在 Linux 系统上部署 canal 非常便捷,用户只需参照官方指南逐条执行即可,此处不再赘述。本节重点在于指导大家如何在开发工具中执行 canal 的示例程序,以便在深入钻研源代码时遇到困难时能够进行调试。

在学习过程中,建议先参照官方文档安装一次 canal,这样做对于深入理解 Canal 的核心组件具有极其重要的促进作用。

首先,我们需要在canal的源代码中查找官方所提供的示例代码,这些代码被放置在特定的包中,具体位置如图所示:

Canal MySQL binlog解析_intellij idea 数据库关系图_数据同步架构设计

遗憾的是,尽管 canal 提供的示例代码中涵盖了客户端的代码,但并未包含服务端的部分,因此我们不得不将注意力转向其单元测试,具体如图所示:

intellij idea 数据库关系图_Canal MySQL binlog解析_数据同步架构设计

随后,我依据官方提供的若干指导,结合个人的领会,撰写了以下测试脚本,并在 IDEA 开发平台中对 Canal 的示例项目进行了执行。这段代码已经过验证,可以放心使用。

1、Canal Demo

packagecom.alibaba.otter.canal的服务器端模块;
importcom.alibaba.otter.canal实例的核心部分,CanalInstance类;
importcom.alibaba.otter.canal实例生成器核心类;
importcom.alibaba.otter.canal的实例管理器中,包含了一个名为CanalInstanceWithManager的类;
importcom.alibaba.otter.canal实例管理器中的模型类,名为Canal。
importcom.alibaba.otter.canal实例管理器中的模型类,名为CanalParameter;
importcom.alibaba.otter.canal.server模块中的嵌入式CanalServerWithEmbedded类;
importcom.alibaba.otter.canal.server模块下的CanalServerWithNetty类;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class CanalServerTestMain {
    protected static final定义字符串常量ZK_CLUSTER_ADDRESS,用以存储ZooKeeper集群的地址信息。"127.0.0.1:2181";
    protected static final String DESTINATION   = "example";
    protected static final String DETECTING_SQL = [id_453773306];
    protected static final String MYSQL_ADDRESS = "127.0.0.1";
    protected static final String USERNAME      = "canal";
    protected static final String PASSWORD      = "canal";
    protected static final String FILTER        = ".\\*\\\\\\\\..\\*";
    /** 默认 500s 后关闭 */
    protected static final long RUN_TIME = 120 * 1000;
    private final将头部数据缓冲区分配为:ByteBuffer header = ByteBuffer.allocate(4);
    private定义了一个名为CanalServerWithNetty的变量,该变量代表了一个基于Netty的网络服务器实例。
    public static void main(String[] args) {
在CanalServerTestMain类中创建了一个名为test的对象。new CanalServerTestMain();
        try {
            test.setUp();
系统输出显示:"start");
        } catch (Throwable e) {
捕捉到异常后,执行打印堆栈跟踪信息的操作。
        } finally {
            System.out.println("sleep");
            try {
执行Thread类的sleep方法,参数为RUN_TIME;暂停程序执行。
            } catch (Throwable ee) {
            }
            test.tearDown();
            System.out.println("end");
        }
    }
    public void setUp() {
在嵌入式服务器中集成的Canal服务器对象为:CanalServerWithEmbedded embeddedServer。new CanalServerWithEmbedded();
嵌入式服务器配置了Canal实例生成器。new CanalInstanceGenerator() {
            public CanalInstance generate(String destination) {
运河运河等于构建运河函数返回值;
                return new创建带有管理器的运河实例,针对canal执行FILTER操作。
            }
        });
nettyServer对象被创建,其值为CanalServerWithNetty实例的调用结果;即nettyServer = CanalServerWithNetty.instance();
nettyServer赋值给其嵌入式服务器为embeddedServer。
        nettyServer.setPort(11111);
        nettyServer.start();
        // 启动 instance
        embeddedServer.start("example");
    }
    public void tearDown() {
        nettyServer.stop();
    }
    private Canal buildCanal() {
        Canal canal = new Canal();
        canal.setId(1L);
运河设定目标名称为目的地。
        canal.setDesc("test");
在代码中声明了一个名为CanalParameter的参数变量。new CanalParameter();
        
parameter设定了元模式为CanalParameter的MetaMode.MEMORY;
parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT);——此操作将参数设置为了心跳模式。
parameter的索引模式被设置为CanalParameter.IndexMode.MEMORY模式;同时,parameter对象的setIndexMode方法被调用,其参数为CanalParameter类下的MEMORY索引模式常量。
parameter调用了setStorageMode方法,将存储模式设置为CanalParameter类中的StorageMode枚举类型,具体为MEMORY模式。
parameter调用了setMemoryStorageBufferSize方法,以设定内存存储缓冲区的大小。32 * 1024);
parameter的sourcingType属性被设置为CanalParameter中的MySQL类型。
parameter配置数据库地址列表为Arrays.asList(new创建一个Socket地址对象,指定MySQL地址为(MYSQL_ADDRESS,)。3306),
                new InetSocketAddress(MYSQL_ADDRESS, 3306)));
parameter.设定数据库用户名为USERNAME;parameter.将数据库用户名设置为USERNAME;parameter.指定数据库用户名为USERNAME;parameter.对数据库用户名进行设置,值为USERNAME。
parameter.设定数据库密码为PASSWORD;,而parameter对象的DbPassword属性值将被更新。
        parameter.setSlaveId(1234L);
设定参数的默认连接超时时间秒数。30);
parameter.设定连接字符集为,"UTF-8");
parameter.setConnectionCharsetNumber((数值参数)=>{return数值参数.toString().replace(/[^0-9]/g,'').replace(/(\d)(?=\d{4})/g,'$1,');}));byte33);
parameter调用了setReceiveBufferSize方法,将接收缓冲区的大小进行了设定。8 * 1024);
8 * 1024);
参数对象启用检测功能。false);
参数设置检测间隔为每秒,设定为。10);
parameter.设定检测重试次数为,3);
parameter.setDetectingSQL(DETECTING_SQL)的操作已被执行,以启用SQL检测功能。
canal对象调用了setCanalParameter方法,并将parameter作为参数传入。
        return canal;
    }
}

2、Canal Demo

package在com.alibaba.otter.canal.example包下;
import java.net.InetSocketAddress;
import java.util.List;
importcom.alibaba.otter.canal.client模块下的CanalConnectors类;
importcom.alibaba.otter.canal模块中的客户端连接器类;
import导入com.alibaba.otter.canal.common.utils包中的AddressUtils类。
importcom.alibaba.otter.canal协议包中的CanalEntry类;
importcom.alibaba.otter.canal协议中的消息类;
import在阿里巴巴集团的Canal项目中,专用的协议入口模块中定义了Column类,该类属于com.alibaba.otter.canal.protocol命名空间。
importcom.alibaba.otter.canal协议包中的CanalEntry类定义了事件类型枚举。
public class SimpleCanalClientExample {
    public static void main(String[] args) {
        // 创建链接
创建了一个名为CanalConnector的连接器实例,该实例是通过调用CanalConnectors类中的newSingleConnector方法生成的。new将地址解析为IP,并指定端口号,以构建一个适用于网络通信的Socket地址对象,其中IP地址通过AddressUtils类获取。
                11111), "example""""");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
执行连接器连接操作;连接器进行连接;调用连接器连接方法;启动连接器连接流程;执行连接器连接命令。
连接器注册监听事件。".*\\..*");
执行连接器的回滚操作;
            int totalEmptyCount = 3000;
            while (emptyCount < totalEmptyCount) {
获取一条消息,不进行确认,使用大小为batchSize的批处理,通过connector对象执行,并将结果赋值给变量Message message。// 获取指定数量的数据
                long batchId = message.getId();
                int获取消息条目数量,并将结果赋值给变量size。
                if (batchId == -1 || size == 0) {
计数器数值增加一。
系统输出显示:"empty count : " + emptyCount);
                    try {
执行延时操作,暂停程序运行。1000);
                    } catch (InterruptedException e) {
                    }
                } else {
emptyCount 被设定为0;
                    输出信息:“批次ID为%s,数据量为%s的批次信息”\n,其中batchId和size分别代表具体的批次标识和数据大小。
执行打印操作,展示message对象中的条目内容。
                }
执行了批量ID为batchId的acknowledgment操作,通过connector对象。// 提交确认
                执行回滚操作,针对批次编号为batchId的数据,以应对处理过程中的失败情况。
            }
            System.out.println("empty too many times, exit");
        } finally {
与connector断开连接操作;
        }
    }
    private static void printEntry(List entrys) {
        for在处理CanalEntry.Entry类型的entry对象时,针对entrys集合中的每一个元素,进行以下操作。
            if若entry的EntryType属性值为TRANSACTIONBEGIN或TRANSACTIONEND,则执行以下操作。
                continue;
            }
CanalEntry的RowChange属性,名为rowChage。null;
            try {
rowChage等于CanalEntry.RowChange对象,该对象是通过调用parseFrom方法从entry对象的StoreValue属性中解析出来的。
            } catch (Exception e) {
                throw new RuntimeException(错误代码##,eromanga-event解析器存在故障,具体数据如下: + entry.toString(),
                        e);
            }
CanalEntry的事件类型eventType等于rowChage获取的事件类型。
禁止输出通过String.format()方法格式化后的字符串。记录binlog信息,位置为[%s:%s],涉及名称为[%s,%s],事件类型为:%s。,
获取条目头部信息中的日志文件名,以及日志文件的偏移量。
获取条目头部信息中的模式名称,以及条目头部信息中的表名称,。
对特定的事件类型进行限制,确保其符合规定的 eventType。
            for在处理(CanalEntry.RowData rowData : rowChage.getRowDatasList())的过程中,需确保每个rowData对象均正确地从rowChage对象中获取其数据列表。
                if若事件类型等于删除操作,则{
执行打印操作,输出rowData对象中获取的前置列列表。
                } else if若事件类型等于插入类型,则{
执行输出操作,展示行数据中的后续列列表。
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List columns) {
        for (Column column : columns) {
打印输出列的名称加逗号,即:System.out.println(column.getName()+,)" : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

运行 的效果如下图所示:

intellij idea 数据库关系图_Canal MySQL binlog解析_数据同步架构设计

在数据库中变更一条数据intellij idea 数据库关系图,以便产生新的日志,其输出结果如下:

数据同步架构设计_intellij idea 数据库关系图_Canal MySQL binlog解析

在IDEA平台上成功构建并执行演示项目,标志着我们迈出了探索canal的第一步;接下来,我们将参照官方发布的文档资料,以此为蓝图,逐步深入探究canal的运作机制,从而为实际操作提供更有效的指导。

本文的介绍先告一段落,接下来,Canal 系列将正式开启连载之旅,敬请各位读者持续关注。

创作艰辛,若我的文字对你有所启发,不妨为这篇文章点赞,这将极大地激励我继续创作出更多高质量的内容。

诚挚邀请您加入我的知识星球intellij idea 数据库关系图,在此我们可以共同交流源码,深入探讨系统架构,揭示亿级订单处理背后的架构设计及实战经验,共同构建一个高水准的技术交流平台,为广大星友提供优质的高质量问答服务。扫描下方二维码,即可轻松加入。

如有侵权请联系删除!

13262879759

微信二维码