canal快速开始

服务端

服务端ip:192.168.1.101

  1. 下载canal 到/opt/softwares

    1
    wget https://github.com/alibaba/canal/releases/download/canal-1.0.26-preview-2/canal.deployer-1.0.26-SNAPSHOT.tar.gz
  1. 解压缩

    1
    mkdir /opt/canal && tar -zxvf canal.deployer-1.0.26-SNAPSHOT.tar.gz -C /opt/canal/
  2. 配置mysql

    1
    2
    3
    4
    5
    6
    vim /etc/my.cnf

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server_id=1
  3. 创建用户

    1
    2
    3
    4
    CREATE USER canal IDENTIFIED BY 'canal';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;
  4. 创建数据库

    1
    CREATE DATABASE IF NOT EXISTS canal_tsdb default charset utf8 COLLATE utf8_general_ci;
  5. 配置canal

    1
    cp /opt/canal/conf/example/instance.properties /opt/canal/conf/example/instance.properties.bak
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    vim /opt/canal/conf/example/instance.properties

    #################################################
    ## mysql serverId
    canal.instance.mysql.slaveId=0
    # position info
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.master.journal.name=mysql-bin.000001
    canal.instance.master.position=12172
    canal.instance.master.timestamp=


    # table meta tsdb info
    canal.instance.tsdb.enable=true
    #canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
    #canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
    canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
    canal.instance.tsdb.dbUsername=canal
    canal.instance.tsdb.dbPassword=canal


    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    # username/password
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.instance.defaultDatabaseName=test
    canal.instance.connectionCharset=UTF-8
    # table regex
    canal.instance.filter.regex=.*\\..*
    # table black regex
    canal.instance.filter.black.regex=
    #################################################

    修改canal.properties

    1
    2
    3
    4
    5
    6
    vim /opt/canal/conf/canal.properties

    #canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
    canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml

    # 找到tsdb的配置,注释上面的配置,启用下面的配置

  6. 启动canal

    1
    sh /opt/canal/bin/startup.sh

客户端

  1. 引入依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.0.23</version>
    </dependency>
  2. 客户端启动类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    package canal.test;

    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;

    import java.net.InetSocketAddress;
    import java.util.List;

    /**
    * @author tt
    */
    public class SimpleCanalClientExample {

    public static void main(String[] args) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.101",
    11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
    connector.connect();
    connector.subscribe(".*\\..*");
    connector.rollback();
    int totalEmptyCount = 120;
    boolean flag = true;
    while (flag || emptyCount < totalEmptyCount) {
    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
    long batchId = message.getId();
    int size = message.getEntries().size();
    if (batchId == -1 || size == 0) {
    emptyCount++;
    System.out.println("empty count : " + emptyCount);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    }
    } else {
    emptyCount = 0;
    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
    printEntry(message.getEntries());
    }

    connector.ack(batchId); // 提交确认
    // connector.rollback(batchId); // 处理失败, 回滚数据
    }

    System.out.println("empty too many times, exit");
    } finally {
    connector.disconnect();
    }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
    for (CanalEntry.Entry entry : entrys) {
    if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
    continue;
    }

    CanalEntry.RowChange rowChage = null;
    try {
    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
    e);
    }

    CanalEntry.EventType eventType = rowChage.getEventType();
    System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
    eventType));

    for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
    if (eventType == CanalEntry.EventType.DELETE) {
    printColumn(rowData.getBeforeColumnsList());
    } else if (eventType == CanalEntry.EventType.INSERT) {
    printColumn(rowData.getAfterColumnsList());
    } else {
    System.out.println("-------> before");
    printColumn(rowData.getBeforeColumnsList());
    System.out.println("-------> after");
    printColumn(rowData.getAfterColumnsList());
    }
    }
    }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
    for (CanalEntry.Column column : columns) {
    System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    }
    }

    }

快速开始到这里就结束了。