网络编程
位置:首页>> 网络编程>> 数据库>> MySQL特定表全量、增量数据同步到消息队列-解决方案

MySQL特定表全量、增量数据同步到消息队列-解决方案

作者:李雷  发布时间:2024-01-24 04:36:47 

标签:MySQL,特定表,全量,增量,数据同步
目录
  • 1、原始需求

  • 2、解决方案

  • 3、canal介绍、安装

    • canal的工作原理

    • 架构

    • 安装

  • 4、验证

    1、原始需求

    既要同步原始全量数据,也要实时同步MySQL特定库的特定表增量数据,同时对应的修改、删除也要对应。

    数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力。

    应用场景:数据ETL同步、降低业务服务器压力。

    2、解决方案

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    3、canal介绍、安装

    canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

    工作原理:mysql主备复制实现

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    从上层来看,复制分成三步:

    1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);

    2. slave将master的binary log events拷贝到它的中继日志(relay log);

    3. slave重做中继日志中的事件,将改变反映它自己的数据。

    canal的工作原理

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    原理相对比较简单:

    1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

    2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)

    3. canal解析binary log对象(原始为byte流)

    架构

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    说明:

    • server代表一个canal运行实例,对应于一个jvm

    • instance对应于一个数据队列 (1个server对应1..n个instance)

    instance模块:

    • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)

    • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)

    • eventStore (数据存储)

    • metaManager (增量订阅&消费信息管理器)

    安装

    1、mysql、kafka环境准备

    2、canal下载:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

    3、解压:tar -zxvf canal.deployer-1.1.3.tar.gz

    4、对目录conf里文件参数配置

    对canal.properties配置:

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    进入conf/example里,对instance.properties配置:

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    5、启动:bin/startup.sh

    6、日志查看:

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    4、验证

    1、开发对应的kafka消费者


    package org.kafka;

    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;

    /**
    *
    * Title: KafkaConsumerTest
    * Description:
    *  kafka消费者 demo
    * Version:1.0.0
    * @author pancm
    * @date 2018年1月26日
    */
    public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;
       private ConsumerRecords<String, String> msgList;
       private final String topic;
       private static final String GROUPID = "groupA";

    public KafkaConsumerTest(String topicName) {
           Properties props = new Properties();
           props.put("bootstrap.servers", "192.168.7.193:9092");
           props.put("group.id", GROUPID);
           props.put("enable.auto.commit", "true");
           props.put("auto.commit.interval.ms", "1000");
           props.put("session.timeout.ms", "30000");
           props.put("auto.offset.reset", "latest");
           props.put("key.deserializer", StringDeserializer.class.getName());
           props.put("value.deserializer", StringDeserializer.class.getName());
           this.consumer = new KafkaConsumer<String, String>(props);
           this.topic = topicName;
           this.consumer.subscribe(Arrays.asList(topic));
       }

    @Override
       public void run() {
           int messageNo = 1;
           System.out.println("---------开始消费---------");
           try {
               for (; ; ) {
                   msgList = consumer.poll(1000);
                   if (null != msgList && msgList.count() > 0) {
                       for (ConsumerRecord<String, String> record : msgList) {
                           //消费100条就打印 ,但打印的数据不一定是这个规律的

    System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());

    //                            String v = decodeUnicode(record.value());

    //                            System.out.println(v);

    //当消费了1000条就退出
                           if (messageNo % 1000 == 0) {
                               break;
                           }
                           messageNo++;
                       }
                   } else {
                       Thread.sleep(11);
                   }
               }
           } catch (InterruptedException e) {
               e.printStackTrace();
           } finally {
               consumer.close();
           }
       }

    public static void main(String args[]) {
           KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data");
           Thread thread1 = new Thread(test1);
           thread1.start();
       }

    /*
        * 中文转unicode编码
        */
       public static String gbEncoding(final String gbString) {
           char[] utfBytes = gbString.toCharArray();
           String unicodeBytes = "";
           for (int i = 0; i < utfBytes.length; i++) {
               String hexB = Integer.toHexString(utfBytes[i]);
               if (hexB.length() <= 2) {
                   hexB = "00" + hexB;
               }
               unicodeBytes = unicodeBytes + "\\u" + hexB;
           }
           return unicodeBytes;
       }

    /*
        * unicode编码转中文
        */
       public static String decodeUnicode(final String dataStr) {
           int start = 0;
           int end = 0;
           final StringBuffer buffer = new StringBuffer();
           while (start > -1) {
               end = dataStr.indexOf("\\u", start + 2);
               String charStr = "";
               if (end == -1) {
                   charStr = dataStr.substring(start + 2, dataStr.length());
               } else {
                   charStr = dataStr.substring(start + 2, end);
               }
               char letter = (char) Integer.parseInt(charStr, 16); // 16进制parse整形字符串。
               buffer.append(new Character(letter).toString());
               start = end;
           }
           return buffer.toString();

    }
    }

    2、对表bak1进行增加数据


    CREATE TABLE `bak1` (
     `vin` varchar(20) NOT NULL,
     `p1` double DEFAULT NULL,
     `p2` double DEFAULT NULL,
     `p3` double DEFAULT NULL,
     `p4` double DEFAULT NULL,
     `p5` double DEFAULT NULL,
     `p6` double DEFAULT NULL,
     `p7` double DEFAULT NULL,
     `p8` double DEFAULT NULL,
     `p9` double DEFAULT NULL,
     `p0` double DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

    show create table bak1;

    insert into bak1 select '李雷abcv',
     `p1` ,
     `p2` ,
     `p3` ,
     `p4` ,
     `p5` ,
     `p6` ,
     `p7` ,
     `p8` ,
     `p9` ,
     `p0`  from moci limit 10

    3、查看输出结果:

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    来源:https://www.cnblogs.com/lilei2blog/p/15608206.html

    0
    投稿

    猜你喜欢

    手机版 网络编程 asp之家 www.aspxhome.com