使用 Binlog 和 Canal 从 MySQL 抽取数据_张吉Jerry的博客-CSDN博客_binlog canal


本站和网页 https://blog.csdn.net/zjerryj/article/details/77152226 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

使用 Binlog 和 Canal 从 MySQL 抽取数据_张吉Jerry的博客-CSDN博客_binlog canal
使用 Binlog 和 Canal 从 MySQL 抽取数据
张吉Jerry
已于 2022-01-26 08:44:59 修改
70226
收藏
57
分类专栏:
大数据
文章标签:
mysql
java
数据仓库
etl
于 2017-08-14 08:39:08 首次发布
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zjerryj/article/details/77152226
版权
大数据
专栏收录该内容
26 篇文章
2 订阅
订阅专栏
数据抽取是 ETL 流程的第一步。我们会将数据从 RDBMS 或日志服务器等外部系统抽取至数据仓库,进行清洗、转换、聚合等操作。在现代网站技术栈中,MySQL 是最常见的数据库管理系统,我们会从多个不同的 MySQL 实例中抽取数据,存入一个中心节点,或直接进入 Hive。市面上已有多种成熟的、基于 SQL 查询的抽取软件,如著名的开源项目 Apache Sqoop,然而这些工具并不支持实时的数据抽取。MySQL Binlog 则是一种实时的数据流,用于主从节点之间的数据复制,我们可以利用它来进行数据抽取。借助阿里巴巴开源的 Canal 项目,我们能够非常便捷地将 MySQL 中的数据抽取到任意目标存储中。
Canal 的组成部分
简单来说,Canal 会将自己伪装成 MySQL 从节点(Slave),并从主节点(Master)获取 Binlog,解析和贮存后供下游消费端使用。Canal 包含两个组成部分:服务端和客户端。服务端负责连接至不同的 MySQL 实例,并为每个实例维护一个事件消息队列;客户端则可以订阅这些队列中的数据变更事件,处理并存储到数据仓库中。下面我们来看如何快速搭建起一个 Canal 服务。
配置 MySQL 主节点
MySQL 默认没有开启 Binlog,因此我们需要对 my.cnf 文件做以下修改:
server-id = 1
log_bin = /path/to/mysql-bin.log
binlog_format = ROW
注意 binlog_format 必须设置为 ROW, 因为在 STATEMENT 或 MIXED 模式下, Binlog 只会记录和传输 SQL 语句(以减少日志大小),而不包含具体数据,我们也就无法保存了。
从节点通过一个专门的账号连接主节点,这个账号需要拥有全局的 REPLICATION 权限。我们可以使用 GRANT 命令创建这样的账号:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
启动 Canal 服务端
从 GitHub 项目发布页中下载 Canal 服务端代码(链接),配置文件在 conf 文件夹下,有以下目录结构:
canal.deployer/conf/canal.properties
canal.deployer/conf/instanceA/instance.properties
canal.deployer/conf/instanceB/instance.properties
conf/canal.properties 是主配置文件,如其中的 canal.port 用以指定服务端监听的端口。instanceA/instance.properties 则是各个实例的配置文件,主要的配置项有:
# slaveId 不能与 my.cnf 中的 server-id 项重复
canal.instance.mysql.slaveId = 1234
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.connectionCharset = UTF-8
# 订阅实例中所有的数据库和表
canal.instance.filter.regex = .*\\..*
执行 sh bin/startup.sh 命令开启服务端,在日志文件 logs/example/example.log 中可以看到以下输出:
Loading properties file from class path resource [canal.properties]
Loading properties file from class path resource [example/instance.properties]
start CannalInstance for 1-example
[destination = example , address = /127.0.0.1:3306 , EventParser] prepare to find start position just show master status
编写 Canal 客户端
从服务端消费变更消息时,我们需要创建一个 Canal 客户端,指定需要订阅的数据库和表,并开启轮询。
首先,在项目中添加 com.alibaba.otter:canal.client 依赖项,构建 CanalConnector 实例:
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
Thread.sleep(3000);
} else {
printEntries(message.getEntries());
connector.ack(batchId);
这段代码和连接消息系统很相似。变更事件会批量发送过来,待处理完毕后我们可以 ACK 这一批次,从而避免消息丢失。
// printEntries
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == EventType.INSERT) {
printColumns(rowData.getAfterCollumnList());
每一个 Entry 代表一组具有相同变更类型的数据列表,如 INSERT 类型、UPDATE、DELETE 等。每一行数据我们都可以获取到各个字段的信息:
// printColumns
String line = columns.stream()
.map(column -> column.getName() + "=" + column.getValue())
.collect(Collectors.joining(","));
System.out.println(line);
完整代码可以在 GitHub 中找到(链接)。
加载至数据仓库
关系型数据库与批量更新
若数据仓库是基于关系型数据库的,我们可以直接使用 REPLACE 语句将数据变更写入目标表。其中需要注意的是写入性能,在更新较频繁的场景下,我们通常会缓存一段时间的数据,并批量更新至数据库,如:
REPLACE INTO `user` (`id`, `name`, `age`, `updated`) VALUES
(1, 'Jerry', 30, '2017-08-12 16:00:00'),
(2, 'Mary', 28, '2017-08-12 17:00:00'),
(3, 'Tom', 36, '2017-08-12 18:00:00');
另一种方式是将数据变更写入按分隔符分割的文本文件,并用 LOAD DATA 语句载入数据库。这些文件也可以用在需要写入 Hive 的场景中。不管使用哪一种方法,请一定注意要对字符串类型的字段进行转义,避免导入时出错。
基于 Hive 的数据仓库
Hive 表保存在 HDFS 上,该文件系统不支持修改,因此我们需要一些额外工作来写入数据变更。常用的方式包括:JOIN、Hive 事务、或改用 HBase。
数据可以归类成基础数据和增量数据。如昨日的 user 表是基础数据,今日变更的行是增量数据。通过 FULL OUTER JOIN,我们可以将基础和增量数据合并成一张最新的数据表,并作为明天的基础数据:
SELECT
COALESCE(b.`id`, a.`id`) AS `id`
,COALESCE(b.`name`, a.`name`) AS `name`
,COALESCE(b.`age`, a.`age`) AS `age`
,COALESCE(b.`updated`, a.`updated`) AS `updated`
FROM dw_stage.`user` a
FULL OUTER JOIN (
-- 增量数据会包含重复数据,因此需要选择最新的那一条
SELECT `id`, `name`, `age`, `updated`
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated` DESC) AS `n`
FROM dw_stage.`user_delta`
) b
WHERE `n` = 1
) b
ON a.`id` = b.`id`;
Hive 0.13 引入了事务和 ACID 表,0.14 开始支持 INSERT、UPDATE、DELETE 语句,Hive 2.0.0 则又新增了 Streaming Mutation API,用以通过编程的方式批量更新 Hive 表中的记录。目前,ACID 表必须使用 ORC 文件格式进行存储,且须按主键进行分桶(Bucket)。Hive 会将变更记录保存在增量文件中,当 OrcInputFormat 读取数据时会自动定位到最新的那条记录。官方案例可以在这个链接中查看。
最后,我们可以使用 HBase 来实现表数据的更新,它是一种 KV 存储系统,同样基于 HDFS。HBase 的数据可以直接为 MapReduce 脚本使用,且 Hive 中可以创建外部映射表指向 HBase。更多信息请查看官方网站。
初始化数据
数据抽取通常是按需进行的,在新增一张表时,数据源中可能已经有大量原始记录了。常见的做法是手工将这批数据全量导入至目标表中,但我们也可以复用 Canal 这套机制来实现历史数据的抽取。
首先,我们在数据源库中创建一张辅助表:
CREATE TABLE `retl_buffer` (
id BIGINT AUTO_INCREMENT PRIMARY KEY
,table_name VARCHAR(255)
,pk_value VARCHAR(255)
);
当需要全量抽取 user 表时,我们执行以下语句,将所有 user.id 写入辅助表中:
INSERT INTO `retl_buffer` (`table_name`, `pk_value`)
SELECT 'user', `id` FROM `user`;
Canal 客户端在处理到 retl_buffer 表的数据变更时,可以从中解析出表名和主键的值,直接反查数据源,将数据写入目标表:
if ("retl_buffer".equals(entry.getHeader().getTableName())) {
String tableName = rowData.getAfterColumns(1).getValue();
String pkValue = rowData.getAfterColumns(2).getValue();
System.out.println("SELECT * FROM " + tableName + " WHERE id = " + pkValue);
这一方法在阿里巴巴的另一个开源软件 Otter 中使用。
Canal 高可用
Canal 服务端中的实例可以配置一个备用 MySQL,从而能够在双 Master 场景下自动选择正在工作的数据源。注意两台主库都需要打开 log_slave_updates 选项。Canal 会使用自己的心跳机制(定期更新辅助表的记录)来检测主库的存活。Canal 自身也有 HA 配置,配合 Zookeeper,我们可以开启多个 Canal 服务端,当某台服务器宕机时,客户端可以从 ZK 中获取新的服务端地址,继续进行消费。更多信息可以参考 Canal AdminGuide。
参考资料
https://github.com/alibaba/canal/wikihttps://github.com/alibaba/otter/wikihttps://www.phdata.io/4-strategies-for-updating-hive-tables/https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions
张吉Jerry
关注
关注
21
点赞
57
收藏
打赏
评论
使用 Binlog 和 Canal 从 MySQL 抽取数据
数据抽取是 ETL 流程的第一步,我们常会需要从多个不同的 MySQL 实例中抽取数据,存入一个中心节点,或直接进入 Hive。借助 Canal 项目,我们能够通过 MySQL Binlog 进行数据抽取。
复制链接
扫一扫
专栏目录
基于 Canal 和 Kafka 实现 MySQL 的 Binlog 近实时同步
yihuliunian的博客
04-04
226
转载自品略图书馆http://www.pinlue.com/article/2020/04/0103/5510101407559.html
前提
近段时间,业务系统架构基本完备,数据层面的建设比较薄弱,因为笔者目前工作重心在于搭建一个小型的数据平台。优先级比较高的一个任务就是需要近实时同步业务系统的数据(包括保存、更新或者软删除)到一个另一个数据源,持久化之前需要清洗数据并且构建一个相对合理...
Canal监听mysql的binlog日志实现数据同步
靖节先生的博客
08-08
3271
Canal监听mysql的binlog日志实现数据同步1. canal概述1.1 canal简介1.2 原理分析1.2.1 MySQL主备复制原理1.2.2 canal原理2. canal安装配置1.1 mysql环境准备1.2 canal下载1.3 canal配置启动3. 实现验证3.1 案例编码3.2 实现验证
1. canal概述
1.1 canal简介
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
早期阿里巴巴因为杭州
评论 5
您还未登录,请先
登录
后发表或查看评论
配置binlog并使用Canal实现Mysql定制同步数据的功能
七加一i的博客
11-07
295
binlog是一个二进制文件,它保存在磁盘中,是用来记录数据库表结构变更、表数据修改的二进制日志。其实除了数据复制外,它还可以实现数据恢复、增量备份等功能。首先需要确保mysql服务已经启用了binloglog_bin = /var/log/mysql/mysql-bin.log #指定binlog路径 binlog-format = ROW在配置文件中加入了log_bin配置项后,表示启用了binlog。
使用Canal实现MySQL的数据实时同步
热门推荐
Demon的博客
05-06
3万+
我们公司的架构大致如下:
途中会使用canal监听MySQL的binlog日志从而实现mysql的数据实时同步到redis和hdfs中。
第一步:开启MySQL的binlog日志
Mysql 的 binlog 日志作用是用来记录 mysql 内部增删等对 mysql 数据库有更新的内容的 记录(对数据库的改动),对数据库的查询 select 或 show 等不会被 binlog 日志记录...
Canal简介
qq_41794285的博客
11-29
257
简介
Canal:译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
数据库镜像
数据...
四、binlog与canal的藕断丝连
01-31
1626
一、Canal的介绍
名称:canal [kə’næl]
译意: 水道/管道/沟渠
语言: 纯java开发
定位: 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql
二、binLog主备原理
mysql主备复制实现
从上面来看,复制分成三步:
master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log event...
缓存同步canal实现(订阅binlog)
qq_46624276的博客
09-02
514
对于实体类,Canal会将改变的数据注入Item,如果属性名和数据库字段名不一致,需要用注解标明。重启docker,/mysql/data目录下会出现mysql-bin.000001。查看主库的状态(如果从库字段position小于主库,那么就有新的数据要获取了)当canal监听到binlog发生变化,会通知canal客户端。2、基于canal实现缓存同步。1、基于mq实现缓存同步。
Canal安装和配置,实现监听binlog日志
qq_45932382的博客
09-26
633
canal监听binlog日志
canal实现mysql实时数据binlog同步
阿拉斯加大闸蟹的博客
01-09
2万+
原理相对比较简单:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
基本说明
canal 1.1.1版本之后, 默认支持将canal serve...
Canal 采集MySQL binlog日志
qq_43528451的博客
04-25
2353
Canal 采集MySQL binlog日志
1、Canal 数据同步之MySQL binlog日志
​Canal是阿里巴巴开发数据实时同步框架,原理与OGG基本类似,都是捕获数据库日志数据,进行解析,将其发送到目标端(比如Kafka 消息队列)。新版Canal 1.1版本
针对MySQL数据库来说,日志数据:binlog日志,二进制日志。
binlog中存储四种类型日志数据:插入insert、更新update、删除delete和清空truncate。
默认情况下,MySQL数据库没有开启bin
如何通过canal读取mysql数据_mysql整合canal读取binlog日志
weixin_39927993的博客
01-19
542
这里将告诉您mysql整合canal读取binlog日志,具体实现方法:canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。1.mysql前期准备(开启binlog):对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.c...
canal 从指定binlog日志文件及定位中读取
胖虎儿的博客
10-12
8461
某些时候,因为canal-server 出现了问题或者因binlog设置删除机制,文件不存在了,我们需要手动调整其读取Binlog的位置
canal实现mysql数据同步
congge
01-27
1万+
canal实现mysql数据同步
解析/监控/订阅 mysql binlog
bocai_xiaodaidai的博客
04-09
2791
mysql binlog的应用场景有很多,如主从同步、数据恢复、数据备份等等,那binlog到底是个什么玩意?参考文章:mysql主从同步机制
binlog本质是一个二进制文件,那又如何解析和使用呢?现成的工具已经有很多了,如Canal、maxwell。
Canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Canal分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大。
Maxwell由zendesk开源,也是由java开发,解析出..
mysql数据抽取_MySQL层次结构数据抽取
weixin_28811345的博客
02-02
151
我更喜欢邻接表的方式。下面的示例使用一个非递归存储过程返回一个树/子树,然后我将其转换为一个XML DOM,但是您可以对resultset执行任何您喜欢的操作。记住,这是从PHP到MySQL的一次调用,而且邻接列表更易于管理。菲律宾比索header("Content-type: text/xml");$conn = new mysqli("localhost", "foo_dbo", "pass"...
MySQL数据库 使用binlog+canal或binlake进行数据库的复制
天涯芳草
05-19
932
前言
在进行冷热分离的时候,需要将数据实时的复制在历史数据库中,我们使用的是binlog+canal的思想,将每次数据库数据的变更转换成消息发出来,然后再操作这些消息达到数据复制的
在京东,实现同样功能的组件,叫binlake
接下来详细说下:
1.Binlog
mysql有多种日志,常见的有:
错误日志(ErrorLog)
更新日志(UpdateLog)
二进制日志(Binlog)
查询日志(QueryLog)
慢查询日志(SlowQ...
基于MySql BinLog的增量订阅和消费组件 Canal
kutianya518的博客
04-03
1203
1、Canal背景
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
2、Canal原理
2.1mysql主备复制实现
从上层来看,复制分成三步:
master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事..
Java实现读取canal中binlog数据
杨鑫newlife的专栏
07-17
2186
package we.com.canal;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import co...
canal订阅mysql的binlog日志+springboot详解
最新发布
qq_42670087的博客
12-05
309
订阅mysql的binlog日志,通过canal中间件将对应日志通过springboot整合的方式打印出来
【MySQL】监听MySQL的binlog日志工具分析:Canal
技术能量站
02-03
573
Canal是阿里巴巴旗下的一款开源项目,利用Java开发。主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费,目前主要支持MySQL。
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:编程工作室
设计师:CSDN官方博客
返回首页
张吉Jerry
CSDN认证博客专家
CSDN认证企业博客
码龄18年
暂无认证
69
原创
5万+
周排名
175万+
总排名
38万+
访问
等级
3257
积分
202
粉丝
108
获赞
22
评论
382
收藏
私信
关注
热门文章
Java 空指针异常的若干解决方案
85859
使用 Binlog 和 Canal 从 MySQL 抽取数据
70224
使用 Apache Flink 开发实时 ETL
16498
TensorFlow 模型如何对外提供服务
13506
RESTful API 中的错误处理
12349
分类专栏
大数据
26篇
编程
21篇
摘译
2篇
PHP
27篇
.NET
5篇
最新评论
Java 空指针异常的若干解决方案
XMcoder:
空指针判空用这种Optional.ofNullable方法也很好https://www.bmabk.com/index.php/post/4830.html
Java 空指针异常的若干解决方案
Adam`南帝·梁:
谢谢
Spark Streaming 中如何实现 Exactly-Once 语义
Icedzzz:
总结的太好了!
使用 Binlog 和 Canal 从 MySQL 抽取数据
大石_001:
使用canal+kafka+canal-adapter但是出现丢数据的情况 该如何解决呢
使用 Python 和 Thrift 连接 HBase
微电子学与固体电子学-俞驰:
[code=csharp]
(Python3.6) appleyuchi@Desktop:~$ pip install hbase
Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
ERROR: Could not find a version that satisfies the requirement hbase (from versions: none)
ERROR: No matching distribution found for hbase
[/code]
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
Python 类型检查实践
使用 Kubernetes 部署 Flink 应用
深入理解 Hive ACID 事务表
2020年1篇
2019年2篇
2018年9篇
2017年35篇
2010年1篇
2009年8篇
2008年27篇
目录
目录
分类专栏
大数据
26篇
编程
21篇
摘译
2篇
PHP
27篇
.NET
5篇
目录
评论 5
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
张吉Jerry
你的鼓励将是我创作的最大动力
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值