发布时间:2025-06-10
浏览次数:0
[[]92[id_1[]]]
[]
Canal 系列的源码剖析活动已经启动,这是一个全新的系列。它不仅深入探讨了 canal 的实现机制,同时也展现了作者在源码阅读方面的技巧。
1、应用场景
提及Canal,众人多半会联想到它是一种专门用于分析MySQL日志的软件,同时还能将MySQL数据库中的信息同步至其他存储媒介,比如。
即,Canal 在实际应用中非常普遍的一个例子:涉及数据异构,这是一种更高级别的数据读写分离的架构设计方案。
企业业务持续增长,达到一定规模后,意识到独立的关系型数据库已不足以应对数据量激增带来的挑战,于是产生了新的架构设计——分库分表。这种设计确实能有效减轻单一数据库的压力,然而,它也带来新的问题,尤其是在关联查询方面,情况更为严重,跨库联合查询更是如此。
在订单系统中,一般存在两种用户群体需要查询订单信息,即消费者和商家。在数据库进行分库分表的操作中,若以消费者为依据进行分库,则同一家商家的订单数据将被分散存储在不同的数据库中;反之,若以商家为标准进行分库,则同一消费者购买的所有订单数据也会分散在不同的数据库中。如此一来,进行关联查询时,不可避免地需要跨库进行数据连接操作,这无疑会增加较高的成本。而且上面的场景只能满足一方的需求,那如何是好呢?
此时,Canal崭露头角,在电商设计领域,商家与顾客实际上被划分为两个独立的服务对象。我们能够为这两类服务分别构建不同的数据库集群。具体来说,我们可以对用户订单库和商家订单库进行分库处理,其中用户订单库作为主库。当用户在订单系统中完成下单操作后,相关数据便被存入用户订单库。随后,通过Canal监听数据库的日志,数据得以同步至商家订单库。此外,用户订单库按照用户ID进行分库,而商家订单库则依据商家ID进行分库,从而完美解决了这一问题。
2、架构设计原理
在充分掌握 Canal 的主要应用领域之后,我们借助 canal 的官方资料,深入剖析其核心架构的设计思想,进而揭开 Canal 深藏不露的奥秘所在。
首先我们简单看一下 MySQL 的主从同步原理:
在这里插入图片描述
从上面的图中可以看成主从复制主要分成三个步骤:
依托于MySQL这一数据同步架构,Canal的设计宗旨主要集中于实现数据的同步功能,也就是数据的复制功能,观察上述图表后,自然而然地联想到了以下的设计方案:
原理相对比较简单:
接下来我们来看一下 的整体组成部分:
说明:
模块:
这些部件我暂未计划深入探究,因为就目前情况来看,我自己对它们也尚无明确认识,然而,这些正是我将来需要着重学习和研究的领域。
3、在 IDEA 中运行 Demo
在 Linux 系统上部署 canal 非常便捷,用户只需参照官方指南逐条执行即可,此处不再赘述。本节重点在于指导大家如何在开发工具中执行 canal 的示例程序,以便在深入钻研源代码时遇到困难时能够进行调试。
在学习过程中,建议先参照官方文档安装一次 canal,这样做对于深入理解 Canal 的核心组件具有极其重要的促进作用。
首先,我们需要在canal的源代码中查找官方所提供的示例代码,这些代码被放置在特定的包中,具体位置如图所示:
遗憾的是,尽管 canal 提供的示例代码中涵盖了客户端的代码,但并未包含服务端的部分,因此我们不得不将注意力转向其单元测试,具体如图所示:
随后,我依据官方提供的若干指导,结合个人的领会,撰写了以下测试脚本,并在 IDEA 开发平台中对 Canal 的示例项目进行了执行。这段代码已经过验证,可以放心使用。
1、Canal Demo
packagecom.alibaba.otter.canal的服务器端模块;
importcom.alibaba.otter.canal实例的核心部分,CanalInstance类;
importcom.alibaba.otter.canal实例生成器核心类;
importcom.alibaba.otter.canal的实例管理器中,包含了一个名为CanalInstanceWithManager的类;
importcom.alibaba.otter.canal实例管理器中的模型类,名为Canal。
importcom.alibaba.otter.canal实例管理器中的模型类,名为CanalParameter;
importcom.alibaba.otter.canal.server模块中的嵌入式CanalServerWithEmbedded类;
importcom.alibaba.otter.canal.server模块下的CanalServerWithNetty类;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class CanalServerTestMain {
protected static final定义字符串常量ZK_CLUSTER_ADDRESS,用以存储ZooKeeper集群的地址信息。"127.0.0.1:2181";
protected static final String DESTINATION = "example";
protected static final String DETECTING_SQL = [id_453773306];
protected static final String MYSQL_ADDRESS = "127.0.0.1";
protected static final String USERNAME = "canal";
protected static final String PASSWORD = "canal";
protected static final String FILTER = ".\\*\\\\\\\\..\\*";
/** 默认 500s 后关闭 */
protected static final long RUN_TIME = 120 * 1000;
private final将头部数据缓冲区分配为:ByteBuffer header = ByteBuffer.allocate(4);
private定义了一个名为CanalServerWithNetty的变量,该变量代表了一个基于Netty的网络服务器实例。
public static void main(String[] args) {
在CanalServerTestMain类中创建了一个名为test的对象。new CanalServerTestMain();
try {
test.setUp();
系统输出显示:"start");
} catch (Throwable e) {
捕捉到异常后,执行打印堆栈跟踪信息的操作。
} finally {
System.out.println("sleep");
try {
执行Thread类的sleep方法,参数为RUN_TIME;暂停程序执行。
} catch (Throwable ee) {
}
test.tearDown();
System.out.println("end");
}
}
public void setUp() {
在嵌入式服务器中集成的Canal服务器对象为:CanalServerWithEmbedded embeddedServer。new CanalServerWithEmbedded();
嵌入式服务器配置了Canal实例生成器。new CanalInstanceGenerator() {
public CanalInstance generate(String destination) {
运河运河等于构建运河函数返回值;
return new创建带有管理器的运河实例,针对canal执行FILTER操作。
}
});
nettyServer对象被创建,其值为CanalServerWithNetty实例的调用结果;即nettyServer = CanalServerWithNetty.instance();
nettyServer赋值给其嵌入式服务器为embeddedServer。
nettyServer.setPort(11111);
nettyServer.start();
// 启动 instance
embeddedServer.start("example");
}
public void tearDown() {
nettyServer.stop();
}
private Canal buildCanal() {
Canal canal = new Canal();
canal.setId(1L);
运河设定目标名称为目的地。
canal.setDesc("test");
在代码中声明了一个名为CanalParameter的参数变量。new CanalParameter();
parameter设定了元模式为CanalParameter的MetaMode.MEMORY;
parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT);——此操作将参数设置为了心跳模式。
parameter的索引模式被设置为CanalParameter.IndexMode.MEMORY模式;同时,parameter对象的setIndexMode方法被调用,其参数为CanalParameter类下的MEMORY索引模式常量。
parameter调用了setStorageMode方法,将存储模式设置为CanalParameter类中的StorageMode枚举类型,具体为MEMORY模式。
parameter调用了setMemoryStorageBufferSize方法,以设定内存存储缓冲区的大小。32 * 1024);
parameter的sourcingType属性被设置为CanalParameter中的MySQL类型。
parameter配置数据库地址列表为Arrays.asList(new创建一个Socket地址对象,指定MySQL地址为(MYSQL_ADDRESS,)。3306),
new InetSocketAddress(MYSQL_ADDRESS, 3306)));
parameter.设定数据库用户名为USERNAME;parameter.将数据库用户名设置为USERNAME;parameter.指定数据库用户名为USERNAME;parameter.对数据库用户名进行设置,值为USERNAME。
parameter.设定数据库密码为PASSWORD;,而parameter对象的DbPassword属性值将被更新。
parameter.setSlaveId(1234L);
设定参数的默认连接超时时间秒数。30);
parameter.设定连接字符集为,"UTF-8");
parameter.setConnectionCharsetNumber((数值参数)=>{return数值参数.toString().replace(/[^0-9]/g,'').replace(/(\d)(?=\d{4})/g,'$1,');}));byte) 33);
parameter调用了setReceiveBufferSize方法,将接收缓冲区的大小进行了设定。8 * 1024);
8 * 1024);
参数对象启用检测功能。false);
参数设置检测间隔为每秒,设定为。10);
parameter.设定检测重试次数为,3);
parameter.setDetectingSQL(DETECTING_SQL)的操作已被执行,以启用SQL检测功能。
canal对象调用了setCanalParameter方法,并将parameter作为参数传入。
return canal;
}
}
2、Canal Demo
package在com.alibaba.otter.canal.example包下;
import java.net.InetSocketAddress;
import java.util.List;
importcom.alibaba.otter.canal.client模块下的CanalConnectors类;
importcom.alibaba.otter.canal模块中的客户端连接器类;
import导入com.alibaba.otter.canal.common.utils包中的AddressUtils类。
importcom.alibaba.otter.canal协议包中的CanalEntry类;
importcom.alibaba.otter.canal协议中的消息类;
import在阿里巴巴集团的Canal项目中,专用的协议入口模块中定义了Column类,该类属于com.alibaba.otter.canal.protocol命名空间。
importcom.alibaba.otter.canal协议包中的CanalEntry类定义了事件类型枚举。
public class SimpleCanalClientExample {
public static void main(String[] args) {
// 创建链接
创建了一个名为CanalConnector的连接器实例,该实例是通过调用CanalConnectors类中的newSingleConnector方法生成的。new将地址解析为IP,并指定端口号,以构建一个适用于网络通信的Socket地址对象,其中IP地址通过AddressUtils类获取。
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
执行连接器连接操作;连接器进行连接;调用连接器连接方法;启动连接器连接流程;执行连接器连接命令。
连接器注册监听事件。".*\\..*");
执行连接器的回滚操作;
int totalEmptyCount = 3000;
while (emptyCount < totalEmptyCount) {
获取一条消息,不进行确认,使用大小为batchSize的批处理,通过connector对象执行,并将结果赋值给变量Message message。// 获取指定数量的数据
long batchId = message.getId();
int获取消息条目数量,并将结果赋值给变量size。
if (batchId == -1 || size == 0) {
计数器数值增加一。
系统输出显示:"empty count : " + emptyCount);
try {
执行延时操作,暂停程序运行。1000);
} catch (InterruptedException e) {
}
} else {
emptyCount 被设定为0;
输出信息:“批次ID为%s,数据量为%s的批次信息”\n,其中batchId和size分别代表具体的批次标识和数据大小。
执行打印操作,展示message对象中的条目内容。
}
执行了批量ID为batchId的acknowledgment操作,通过connector对象。// 提交确认
执行回滚操作,针对批次编号为batchId的数据,以应对处理过程中的失败情况。
}
System.out.println("empty too many times, exit");
} finally {
与connector断开连接操作;
}
}
private static void printEntry(List entrys) {
for在处理CanalEntry.Entry类型的entry对象时,针对entrys集合中的每一个元素,进行以下操作。
if若entry的EntryType属性值为TRANSACTIONBEGIN或TRANSACTIONEND,则执行以下操作。
continue;
}
CanalEntry的RowChange属性,名为rowChage。null;
try {
rowChage等于CanalEntry.RowChange对象,该对象是通过调用parseFrom方法从entry对象的StoreValue属性中解析出来的。
} catch (Exception e) {
throw new RuntimeException(错误代码##,eromanga-event解析器存在故障,具体数据如下: + entry.toString(),
e);
}
CanalEntry的事件类型eventType等于rowChage获取的事件类型。
禁止输出通过String.format()方法格式化后的字符串。记录binlog信息,位置为[%s:%s],涉及名称为[%s,%s],事件类型为:%s。,
获取条目头部信息中的日志文件名,以及日志文件的偏移量。
获取条目头部信息中的模式名称,以及条目头部信息中的表名称,。
对特定的事件类型进行限制,确保其符合规定的 eventType。
for在处理(CanalEntry.RowData rowData : rowChage.getRowDatasList())的过程中,需确保每个rowData对象均正确地从rowChage对象中获取其数据列表。
if若事件类型等于删除操作,则{
执行打印操作,输出rowData对象中获取的前置列列表。
} else if若事件类型等于插入类型,则{
执行输出操作,展示行数据中的后续列列表。
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List columns) {
for (Column column : columns) {
打印输出列的名称加逗号,即:System.out.println(column.getName()+,)" : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
运行 的效果如下图所示:
在数据库中变更一条数据intellij idea 数据库关系图,以便产生新的日志,其输出结果如下:
在IDEA平台上成功构建并执行演示项目,标志着我们迈出了探索canal的第一步;接下来,我们将参照官方发布的文档资料,以此为蓝图,逐步深入探究canal的运作机制,从而为实际操作提供更有效的指导。
本文的介绍先告一段落,接下来,Canal 系列将正式开启连载之旅,敬请各位读者持续关注。
创作艰辛,若我的文字对你有所启发,不妨为这篇文章点赞,这将极大地激励我继续创作出更多高质量的内容。
诚挚邀请您加入我的知识星球intellij idea 数据库关系图,在此我们可以共同交流源码,深入探讨系统架构,揭示亿级订单处理背后的架构设计及实战经验,共同构建一个高水准的技术交流平台,为广大星友提供优质的高质量问答服务。扫描下方二维码,即可轻松加入。
如有侵权请联系删除!
Copyright © 2023 江苏优软数字科技有限公司 All Rights Reserved.正版sublime text、Codejock、IntelliJ IDEA、sketch、Mestrenova、DNAstar服务提供商
13262879759
微信二维码