Synchronize the full and incremental data of a specific MySQL table to the message queue - Solution

Synchronize the full and incremental data of a specific MySQL table to the message queue - Solution

1. Original demand

It is necessary to synchronize the original full data as well as the incremental data of specific tables in specific MySQL libraries in real time, and the corresponding modifications and deletions must also be synchronized.

Data synchronization must not be intrusive: business procedures must not be changed, and there must not be too much performance pressure on the business side.

Application scenarios: data ETL synchronization and reducing the pressure on business servers.

2. Solution

3. Canal introduction and installation

Canal is an open source project under Alibaba, developed in pure Java. Based on database incremental log analysis, it provides incremental data subscription and consumption, and currently mainly supports MySQL (also supports mariaDB).

Working principle: MySQL master-slave replication implementation

From a high level perspective, replication is divided into three steps:

  1. The master records the changes in the binary log (these records are called binary log events and can be viewed with show binlog events).
  2. The slave copies the master's binary log events to its relay log;
  3. The slave redoes the events in the relay log, changing the data to reflect its own.

How canal works

The principle is relatively simple:

  1. Canal simulates the interactive protocol of MySQL slave, pretends to be MySQL slave, and sends dump protocol to MySQL master
  2. MySQL master receives the dump request and starts pushing binary log to slave (canal)
  3. Canal parses binary log objects (originally byte streams)

Architecture

illustrate:

  • Server represents a canal running instance, corresponding to a jvm
  • An instance corresponds to a data queue (one server corresponds to 1..n instances)

instance module:

  • eventParser (data source access, simulate slave protocol and master interaction, protocol parsing)
  • eventSink (Parser and Store connector, performs data filtering, processing, and distribution)
  • eventStore (data storage)
  • metaManager (incremental subscription & consumption information manager)

Install

1. MySQL and Kafka environment preparation

2. Download canal: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3. Unzip: tar -zxvf canal.deployer-1.1.3.tar.gz

4. Configure file parameters in directory conf

Configure canal.properties:

Enter conf/example and configure instance.properties:

5. Start: bin/startup.sh

6. Log viewing:

4. Verification

1. Develop the corresponding kafka consumer

package org.kafka;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;


/**
 *
 * Title: KafkaConsumerTest
 * Description:
 * kafka consumer demo
 * Version:1.0.0
 * @author pancm
 * @date January 26, 2018 */
public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUPID = "groupA";

    public KafkaConsumerTest(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        int messageNo = 1;
        System.out.println("---------Start consumption---------");
        try {
            for (; ; ) {
                msgList = consumer.poll(1000);
                if (null != msgList && msgList.count() > 0) {
                    for (ConsumerRecord<String, String> record : msgList) {
                        //Print after consuming 100 records, but the printed data may not follow this pattern System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());


// String v = decodeUnicode(record.value());

// System.out.println(v);

                        // Exit when 1000 messages are consumed if (messageNo % 1000 == 0) {
                            break;
                        }
                        messageNo++;
                    }
                } else {
                    Thread.sleep(11);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        finally
            consumer.close();
        }
    }

    public static void main(String args[]) {
        KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data");
        Thread thread1 = new Thread(test1);
        thread1.start();
    }


    /*
     * Convert Chinese to unicode*/
    public static String gbEncoding(final String gbString) {
        char[] utfBytes = gbString.toCharArray();
        String unicodeBytes = "";
        for (int i = 0; i < utfBytes.length; i++) {
            String hexB = Integer.toHexString(utfBytes[i]);
            if (hexB.length() <= 2) {
                hexB = "00" + hexB;
            }
            unicodeBytes = unicodeBytes + "\\u" + hexB;
        }
        return unicodeBytes;
    }

    /*
     * unicode encoding to Chinese*/
    public static String decodeUnicode(final String dataStr) {
        int start = 0;
        int end = 0;
        final StringBuffer buffer = new StringBuffer();
        while (start > -1) {
            end = dataStr.indexOf("\\u", start + 2);
            String charStr = "";
            if (end == -1) {
                charStr = dataStr.substring(start + 2, dataStr.length());
            } else {
                charStr = dataStr.substring(start + 2, end);
            }
            char letter = (char) Integer.parseInt(charStr, 16); // Parse hexadecimal integer string.
            buffer.append(new Character(letter).toString());
            start = end;
        }
        return buffer.toString();

    }
}

2. Add data to table bak1

CREATE TABLE `bak1` (
  `vin` varchar(20) NOT NULL,
  `p1` double DEFAULT NULL,
  `p2` double DEFAULT NULL,
  `p3` double DEFAULT NULL,
  `p4` double DEFAULT NULL,
  `p5` double DEFAULT NULL,
  `p6` double DEFAULT NULL,
  `p7` double DEFAULT NULL,
  `p8` double DEFAULT NULL,
  `p9` double DEFAULT NULL,
  `p0` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

show create table bak1;

insert into bak1 select '李雷abcv',
  `p1`,
  `p2`,
  `p3`,
  `p4`,
  `p5`,
  `p6`,
  `p7`,
  `p8`,
  `p9`,
  `p0` from moci limit 10

3. View the output results:

This concludes this article about synchronizing the full and incremental data of a specific MySQL table to a message queue - solution. For more information about synchronizing data in a specific MySQL table, please search for previous articles on 123WORDPRESS.COM or continue browsing the following related articles. I hope you will support 123WORDPRESS.COM in the future!

You may also be interested in:
  • Detailed explanation of how to synchronize data from MySQL to Elasticsearch
  • Tutorial on how to synchronize MySQL data to ElasticSearch using Python
  • Steps to synchronize MongoDB data to MySQL using node.js
  • MySQL5.6 master-slave replication (mysql data synchronization configuration)
  • Detailed explanation of reducing MySQL master-slave data synchronization delay
  • mysql trigger to synchronize data between two tables
  • Summary of solutions to the problem of Slave_IO_Running:No in MySQL data synchronization
  • MySQL backup and migration data synchronization method
  • MYSQL5 masterslave data synchronization configuration method
  • How to synchronize Mysql data

<<:  Bootstrap3.0 study notes table related

>>:  A detailed introduction to setting up Jenkins on Tencent Cloud Server

Recommend

Mybatis fuzzy query implementation method

Mybatis fuzzy query implementation method The rev...

Introduction to the use of HTML element noscript

noscript definition and usage The noscript elemen...

A brief discussion on the Linux kernel's support for floating-point operations

Currently, most CPUs support floating-point units...

XHTML Tutorial: XHTML Basics for Beginners

<br />This site’s original content, please i...

border-radius is a method for adding rounded borders to elements

border-radius:10px; /* All corners are rounded wi...

Zabbix monitors the process of Linux system services

Zabbix automatically discovers rules to monitor s...

Vue implements small search function

This article example shares the specific code of ...

Solutions to the failure and invalidity of opening nginx.pid

Table of contents 1. Problem Description 2. Probl...

Teach you about react routing in five minutes

Table of contents What is Routing Basic use of pu...

MySQL kill command usage guide

KILL [CONNECTION | QUERY] processlist_id In MySQL...

How does Vue track data changes?

Table of contents background example Misconceptio...

A brief discussion on the mysql execution process and sequence

Table of contents 1:mysql execution process 1.1: ...