你好,欢迎进入江苏优软数字科技有限公司官网!

诚信、勤奋、创新、卓越

友好定价、专业客服支持、正版软件一站式服务提供

13262879759

工作日:9:00-22:00

数据异构重器之 Canal 初探

发布时间:2023-07-03

浏览次数:0

点击上方“中间件兴趣圈”,选择“设为明星”

做一个积极向上的人,越努力,越幸运!

源码解析Canal系列开始了,一个全新的系列,可以讲解canal本身的实现原理,也是作者源码阅读方法的展示。

1、应用场景

提到Canal,大家应该都会想到它是一个用于解析MySQL日志、将MySQL数据库中的数据同步到其他存储介质等的工具。

也就是Canal的一个非常常见的使用场景:数据异构,一种更高层次的数据读写分离架构设计技术。

随着业务的不断发展,当企业发展到一定阶段,发现单一的关系型数据库已经无法支撑业务快速发展带来的数据不断积累的压力,于是一种设计框架就诞生了:分数据库和子表。 分库分表确实是一个非常好的缓解单库数据库压力的方案,但是却导致了另一个困境,就是对关联查询不友好,甚至跨库JOIN更是如此。

举例如下: 比如在一个订单系统中,一般有两类用户需要查询订单,一类是顾客,一类是店铺。 一个店铺的订单数据会分布在不同的数据库中。 如果使用()来划分数据库,那么同一个用户订购的所有订单数据将会分布在不同的数据库中。 这样关联查询就必须跨数据库Join,成本会很低。 而里面的场景只能满足一方的需求,那怎么办呢?

Canal此时首次亮相。 在电子商务设计中,虽然商店和顾客会被拆分成两个不同的服务,但我们可以为这两个不同的服务构建不同的数据库集群。 我们可以使用用户订单库,商户订单数据库分数据库,用户订单数据库为主数据库。 当用户在订单系统下订单时,数据进入用户订单数据库,然后可以通过canal挖掘数据库的日志,然后将数据同步到商店订单库,同时用户订单库根据用户ID分库intellij idea 数据库关系图,店铺订单库根据店铺ID分库,完美解决了问题。

2. 架构设计原则

了解了Canal的基本使用场景后,我们通过Canal官方文档探索其核心架构设计理念,从而打开并步入Canal的神秘世界。

首先我们简单了解一下MySQL的主从同步原理:

intellij idea 数据库关系图_数据的异构性_异构数据整合

在此插入图片描述

从上图可以看出,主从复制主要分为三步:

基于MySQL的这些数据同步机制,Canal的设计目标主要是实现数据同步,即数据复制。 从里面的图来看,很自然的想到了下面的设计:

数据的异构性_intellij idea 数据库关系图_异构数据整合

原理比较简单:

我们先看一下整体的组成部分:

异构数据整合_数据的异构性_intellij idea 数据库关系图

阐明:

模块:

我暂时不打算深入研究这种组件,因为现阶段我自己也不了解,但这是我后续学习和研究的重点。

3.在IDEA中运行Demo

Linux环境下安装canal比较简单。 您可以按照官方指南一步步进行。 这里我就不再重复介绍了。 本节的主要目的是在开发工具中运行Canal的demo,方便后续研究源码。 过程中遇到困难时可以进行调试。

温馨提示:在学习过程中,您可以先按照官方文档安装Canal,这对于了解Canal的核心组件非常有帮助。

首先从canal源码中寻找官方的Demo。 示例代码在包中,如右图所示:

数据的异构性_异构数据整合_intellij idea 数据库关系图

不过有点遗憾的是canal提供的示例代码只包含端相关的代码,并没有包含-side(),所以我们重点关注它的单元测试intellij idea 数据库关系图,如右图所示:

异构数据整合_intellij idea 数据库关系图_数据的异构性

然后根据官方的一些提示和我自己的理解,编写了如下测试代码,在IDEA开发工具中运行Canal相关的Demo。 下面的代码已经测试过,可以直接使用。

1. 运河演示

package com.alibaba.otter.canal.server;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class CanalServerTestMain {
    protected static final String ZK_CLUSTER_ADDRESS      = "127.0.0.1:2181";
    protected static final String DESTINATION   = "example";
    protected static final String DETECTING_SQL = "select 1";
    protected static final String MYSQL_ADDRESS = "127.0.0.1";
    protected static final String USERNAME      = "canal";
    protected static final String PASSWORD      = "canal";
    protected static final String FILTER        = ".\\\\*\\\\\\\\\\\\\\\\..\\\\*";
    /** 默认 500s 后关闭 */
    protected static final long RUN_TIME = 120 * 1000;
    private final ByteBuffer header        = ByteBuffer.allocate(4);
    private CanalServerWithNetty nettyServer;
    public static void main(String[] args) {
        CanalServerTestMain test = new CanalServerTestMain();
        try {
            test.setUp();
            System.out.println("start");
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            System.out.println("sleep");
            try {
                Thread.sleep(RUN_TIME);
            } catch (Throwable ee) {
            }
            test.tearDown();
            System.out.println("end");
        }
    }
    public void setUp() {
        CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
        embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
            public CanalInstance generate(String destination) {
                Canal canal = buildCanal();
                return new CanalInstanceWithManager(canal, FILTER);
            }
        });
        nettyServer = CanalServerWithNetty.instance();
        nettyServer.setEmbeddedServer(embeddedServer);
        nettyServer.setPort(11111);
        nettyServer.start();
        // 启动 instance
        embeddedServer.start("example");
    }
    public void tearDown() {
        nettyServer.stop();
    }
    private Canal buildCanal() {
        Canal canal = new Canal();
        canal.setId(1L);
        canal.setName(DESTINATION);
        canal.setDesc("test");
        CanalParameter parameter = new CanalParameter();
        //parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS));
        parameter.setMetaMode(CanalParameter.MetaMode.MEMORY);
        parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT);
        parameter.setIndexMode(CanalParameter.IndexMode.MEMORY);
        parameter.setStorageMode(CanalParameter.StorageMode.MEMORY);
        parameter.setMemoryStorageBufferSize(32 * 1024);
        parameter.setSourcingType(CanalParameter.SourcingType.MYSQL);
        parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306),
                new InetSocketAddress(MYSQL_ADDRESS, 3306)));
        parameter.setDbUsername(USERNAME);
        parameter.setDbPassword(PASSWORD);
        parameter.setSlaveId(1234L);
        parameter.setDefaultConnectionTimeoutInSeconds(30);
        parameter.setConnectionCharset("UTF-8");
        parameter.setConnectionCharsetNumber((byte33);
        parameter.setReceiveBufferSize(8 * 1024);
        parameter.setSendBufferSize(8 * 1024);
        parameter.setDetectingEnable(false);
        parameter.setDetectingIntervalInSeconds(10);
        parameter.setDetectingRetryTimes(3);
        parameter.setDetectingSQL(DETECTING_SQL);
        canal.setCanalParameter(parameter);
        return canal;
    }
}

2. 运河演示

package com.alibaba.otter.canal.example;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
public class SimpleCanalClientExample {
    public static void main(String[] args) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example""""");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\\\..*");
            connector.rollback();
            int totalEmptyCount = 3000;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \\n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }
    private static void printEntry(List entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
    private static void printColumn(List columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

手术疗效如右图所示:

数据的异构性_intellij idea 数据库关系图_异构数据整合

改变数据库中的一条数据,方便形成新的日志,输出结果如下:

数据的异构性_异构数据整合_intellij idea 数据库关系图

能够在IDEA中构建并运行demo是我们进入canal的第一步。 未来我们将按照官方文档中的内容为纲领,尝试逐步解锁canal的实现原理,从而更好地指导实践。

本文就先到此为止,运河系列即将开始连载,敬请期待。

原创不易,如果对您有帮助,请点击【寻找】这篇文章,这将是我写出更多优质文章的最强劲动力。

欢迎加入我的知识星球,一起交流源码,讲解结构,解密亿级订单的结构设计和实践经验,构建优质的技术交流圈,为广大用户提供优质的问答服务广大明星朋友。 长按以下二维码即可加入。

异构数据整合_数据的异构性_intellij idea 数据库关系图

如有侵权请联系删除!

13262879759

微信二维码