Maxwell是由美国Zendesk公司开源,使用Java编写的MySQL变更数据抓取软件。他会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以JSON的格式发送给Kafka、Kinesis、RabbitMQ、Redis、Google CloudPub/Sub、文件或其它平台等等流数据处理平台
Maxwell项目官方网站:https://maxwells-daemon.io/ Maxwell项目Github官网:https://github.com/zendesk/maxwell
mysql> update test set name = 'wang111' where id=1;Query OK, 1 row affected (0.01 sec)原始SQL转化为json
xxxxxxxxxx{ "database": "wangtingdb", "table": "test", "type": "update", "ts": 1676444034, "xid": 2569, "commit": true, "data": { "id": 1, "name": "wang111" }, "old": { "name": "wang" }}字段说明:
database # 变更数据所属的数据库
table # 变更数据所属的表
type # 数据变更类型( insert,update,delete )
ts # 数据变更发生的时间戳( 1676443644 -> 2023-02-15 14:47:24 )
xid # 事务id
commit # 事务提交标志,可用于重新组装事务
data
old # 对于update的类型,表示修改前的数据,仅包含变更字段
Maxwell 的常见应用场景有数仓ETL的数据同步 、维护缓存、收集表级别的dml 指标、增量到搜索引擎、数据分区迁移、切库 binlog 回滚方案等等
Maxwell的实现原理很简单,就是将自己伪装成MySQL的Slave,并遵循Mysql主从复制的协议,从master中同步数据。
实时读取Mysql数据库的二进制日志--Binlog,从中获取变更数据,再将变更数据以Json的格式发送至Kafka等等流处理平台( Kafka并非唯一输出途径 )
MySql二级制日志
xxxxxxxxxx[wangting@hdt-dmcp-ops05 mysql]$ pwd/var/lib/mysql[wangting@hdt-dmcp-ops05 mysql]$ sudo ls -l | grep mysql-bin-rw-r----- 1 mysql mysql 3040 Feb 15 15:05 mysql-bin.000001-rw-r----- 1 mysql mysql 19 Feb 15 13:59 mysql-bin.indexMysql主从复制
Mysql主从复制:就是用来建立一个和主数据库完全一样的数据库环境,这个数据库称为从数据库。
主从复制的应用场景:

Mysql主从复制工作原理
【注意】:
Maxwell-1.30.0及以上的版本不再支持JDK1.8,而JDK1.8支持的最后一个版本为1.29.2
xxxxxxxxxx# 下载安装包[wangting@hdt-dmcp-ops05 software]$ wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz# 解压包[wangting@hdt-dmcp-ops05 software]$ tar -xf maxwell-1.29.2.tar.gz -C /opt/module/[wangting@hdt-dmcp-ops05 software]$ mv /opt/module/maxwell-1.29.2 /opt/module/maxwell# 目录结构[wangting@hdt-dmcp-ops05 maxwell]$ lltotal 84drwxrwxr-x 2 wangting wangting 4096 Feb 15 13:54 bin-rw-r--r-- 1 wangting wangting 25133 Jan 25 2021 config.md-rw-r--r-- 1 wangting wangting 11970 Jan 25 2021 config.properties.example-rw-r--r-- 1 wangting wangting 10259 Apr 23 2020 kinesis-producer-library.properties.exampledrwxr-xr-x 3 wangting wangting 12288 Jan 27 2021 lib-rw-r--r-- 1 wangting wangting 548 Apr 23 2020 LICENSE-rw-r--r-- 1 wangting wangting 470 Jan 25 2021 log4j2.xml-rw-r--r-- 1 wangting wangting 3328 Jan 27 2021 quickstart.md-rw-r--r-- 1 wangting wangting 1429 Jan 27 2021 README.mdx[wangting@hdt-dmcp-ops05 maxwell]$ sudo vim /etc/my.cnf
[mysqld]server-id=1log-bin=mysql-binbinlog_format=rowbinlog-do-db=wangtingdb
server-id=1
- 数据库id
log-bin=mysql-bin
- 启动Binlog,该参数的值会作为binlog的文件名前缀
binlog_format=row
- binlog类型,maxwell要求为row类型
binlog-do-db=wangtingdb
- 启动binlog的数据库,需根据实际情况修改配置
xxxxxxxxxx[wangting@hdt-dmcp-ops05 maxwell]$ sudo systemctl restart mysqldxxxxxxxxxx[wangting@hdt-dmcp-ops05 maxwell]$ mysql -uroot -h172.20.12.179 -p# 创建数据库mysql> create database maxwell character set utf8mb4;Query OK, 1 row affected (0.00 sec)# 创建Maxwell用户mysql> create user 'maxwell'@'%' identified by 'maxwell';Query OK, 0 rows affected (0.00 sec)# 赋予其必要权限mysql> grant all on maxwell.* to 'maxwell'@'%';Query OK, 0 rows affected (0.00 sec)mysql> grant select, replication client, replication slave on *.* to 'maxwell'@'%';Query OK, 0 rows affected (0.01 sec)# 刷新配置mysql> flush privileges;Query OK, 0 rows affected (0.00 sec)mysql> quit;Bye
[wangting@hdt-dmcp-ops05 maxwell]$ mysql -umaxwell -pmaxwell -e "show databases;"+--------------------+| Database |+--------------------+| information_schema || wangtingdb || maxwell || mysql || performance_schema || sys |+--------------------+xxxxxxxxxx[wangting@hdt-dmcp-ops05 maxwell]$ mv config.properties.example config.properties[wangting@hdt-dmcp-ops05 maxwell]$ vim config.properties# Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redisproducer=kafka# 目标Kafka集群地址kafka.bootstrap.servers=hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092# 目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}kafka_topic=maxwell
#MySQL相关配置host=hdt-dmcp-ops05user=maxwellpassword=maxwelljdbc_options=useSSL=false&serverTimezone=Asia/Shanghai注意:若Maxwell发送数据的目的地是kafka集群,需要首先将kafka集群启动
xxxxxxxxxx# 启动Maxwell[wangting@hdt-dmcp-ops05 maxwell]$ /opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon
# 停止Maxwell[wangting@hdt-dmcp-ops05 maxwell]$ ps -ef | grep maxwell | grep -v grep | awk '{print $2}' | xargs kill -9xxxxxxxxxx[wangting@hdt-dmcp-ops05 bin]$ vim mymaxwell
MAXWELL_HOME=/opt/module/maxwell
status_maxwell(){ result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l` return $result}
start_maxwell(){ status_maxwell if [[ $? -lt 1 ]]; then echo "启动Maxwell" $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon else echo "Maxwell正在运行" fi}
stop_maxwell(){ status_maxwell if [[ $? -gt 0 ]]; then echo "停止Maxwell" ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9 else echo "Maxwell未在运行" fi}
case $1 in start ) start_maxwell ;; stop ) stop_maxwell ;; restart ) stop_maxwell sleep 1 start_maxwell ;;esacxxxxxxxxxx[wangting@hdt-dmcp-ops05 bin]$ chmod +x mymaxwell[wangting@hdt-dmcp-ops05 bin]$ mymaxwell stop停止Maxwell[wangting@hdt-dmcp-ops05 bin]$ mymaxwell start启动Maxwellxxxxxxxxxx[wangting@hdt-dmcp-ops05 maxwell]$ mysql -uroot -p123456
mysql> use wangtingdb;Database changed
mysql> create table test(id int,name varchar(20));Query OK, 0 rows affected (0.02 sec)
mysql> insert into test value(1,"wang");Query OK, 1 row affected (0.01 sec)
mysql> insert into test value(2,"wang2");Query OK, 1 row affected (0.01 sec)
mysql>xxxxxxxxxx[wangting@hdt-dmcp-ops05 maxwell]$ tail -f logs/MaxwellDaemon.out value.serializer = class org.apache.kafka.common.serialization.StringSerializer
14:39:07,612 INFO AppInfoParser - Kafka version : 1.0.014:39:07,612 INFO AppInfoParser - Kafka commitId : aaa7af6d4a11b29d14:39:07,626 INFO Maxwell - Maxwell v1.29.2 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[mysql-bin.000001:977], lastHeartbeat=0]14:39:07,756 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000001:977], lastHeartbeat=0])14:39:07,809 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000001:97714:39:07,819 INFO BinaryLogClient - Connected to hdt-dmcp-ops05:3306 at mysql-bin.000001/977 (sid:6379, cid:36)14:39:07,819 INFO BinlogConnectorReplicator - Binlog connected.14:44:48,333 INFO AbstractSchemaStore - storing schema @Position[BinlogPosition[mysql-bin.000001:1042], lastHeartbeat=0] after applying "create table test(id int,name varchar(20))" to wangtingdb, new schema id is 2xxxxxxxxxx[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}将消息格式化显示便于理解:
xxxxxxxxxx{ "database": "wangtingdb", "table": "test", "type": "insert", "ts": 1676443636, "xid": 1687, "commit": true, "data": { "id": 1, "name": "wang" }}
{ "database": "wangtingdb", "table": "test", "type": "insert", "ts": 1676443644, "xid": 1709, "commit": true, "data": { "id": 2, "name": "wang2" }}xxxxxxxxxxmysql> update test set name = 'wang111' where id=1;Query OK, 1 row affected (0.01 sec)Rows matched: 1 Changed: 1 Warnings: 0查看Kafka变化
xxxxxxxxxx[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}# 新增了如下更新消息{"database":"wangtingdb","table":"test","type":"update","ts":1676444034,"xid":2569,"commit":true,"data":{"id":1,"name":"wang111"},"old":{"name":"wang"}} # 格式化如下:{ "database": "wangtingdb", "table": "test", "type": "update", "ts": 1676444034, "xid": 2569, "commit": true, "data": { "id": 1, "name": "wang111" }, "old": { "name": "wang" }}xxxxxxxxxxmysql> delete from test where id = 2;Query OK, 1 row affected (0.00 sec)查看Kafka变化
xxxxxxxxxx[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --from-beginning --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092{"database":"wangtingdb","table":"test","type":"insert","ts":1676443636,"xid":1687,"commit":true,"data":{"id":1,"name":"wang"}}{"database":"wangtingdb","table":"test","type":"insert","ts":1676443644,"xid":1709,"commit":true,"data":{"id":2,"name":"wang2"}}{"database":"wangtingdb","table":"test","type":"update","ts":1676444034,"xid":2569,"commit":true,"data":{"id":1,"name":"wang111"},"old":{"name":"wang"}}# 新增了如下更新消息{"database":"wangtingdb","table":"test","type":"delete","ts":1676444127,"xid":2777,"commit":true,"data":{"id":2,"name":"wang2"}}# 格式化如下:{ "database": "wangtingdb", "table": "test", "type": "delete", "ts": 1676444127, "xid": 2777, "commit": true, "data": { "id": 2, "name": "wang2" }}使用 Maxwell-bootstrap 命令
Maxwell提供了bootstrap命令功能来进行历史数据的全量同步,( 但依然前提要运行一个maxwell )
xxxxxxxxxxmysql> select * from test;+------+---------+| id | name |+------+---------+| 1 | wang111 |+------+---------+1 row in set (0.00 sec)
mysql> insert into test value(2,"wang222");Query OK, 1 row affected (0.00 sec)
mysql> insert into test value(3,"wang333");Query OK, 1 row affected (0.00 sec)
mysql> insert into test value(4,"wang444");Query OK, 1 row affected (0.01 sec)
mysql> select * from test;+------+---------+| id | name |+------+---------+| 1 | wang111 || 2 | wang222 || 3 | wang333 || 4 | wang444 |+------+---------+4 rows in set (0.00 sec)xxxxxxxxxx[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092
# 此时无消息,在等待消费消息中xxxxxxxxxx# 查看Maxwell服务是否运行[wangting@hdt-dmcp-ops05 bin]$ ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grepwangting 22431 1 0 15:06 pts/2 00:00:08 /opt/module/java/bin/java -Dfile.encoding=UTF-8 -Dlog4j.shutdownCallbackRegistry=com.djdch.log4j.StaticShutdownCallbackRegistry -cp :/opt/module/maxwell/bin/../lib/*:/opt/module/maxwell/bin/../lib/kafka-clients/kafka-clients-1.0.0.jar com.zendesk.maxwell.Maxwell --config /opt/module/maxwell/config.properties --daemon# [wangting@hdt-dmcp-ops05 bin]$ cd /opt/module/maxwell/[wangting@hdt-dmcp-ops05 maxwell]$ bin/maxwell-bootstrap --database wangtingdb --table test --config config.propertiesconnecting to jdbc:mysql://hdt-dmcp-ops05:3306/maxwell?allowPublicKeyRetrieval=true&connectTimeout=5000&serverTimezone=Asia%2FShanghai&zeroDateTimeBehavior=convertToNull&useSSL=falsexxxxxxxxxx[wangting@hdt-dmcp-ops01 ~]$ kafka-console-wangtingdb.sh --topic maxwell --bootstrap-server hdt-dmcp-ops01:9092,hdt-dmcp-ops02:9092,hdt-dmcp-ops03:9092{"database":"wangtingdb","table":"test","type":"bootstrap-start","ts":1676444947,"data":{}}{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":1,"name":"wang111"}}{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":2,"name":"wang222"}}{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":3,"name":"wang333"}}{"database":"wangtingdb","table":"test","type":"bootstrap-insert","ts":1676444947,"data":{"id":4,"name":"wang444"}}{"database":"wangtingdb","table":"test","type":"bootstrap-complete","ts":1676444947,"data":{}}【注意】:
- 虽然是4条数据但对应了6条消息
- 第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据,是bootstrap开始和结束的标志,不包含数据,中间的type为bootstrap-insert的才是包含数据
- 一次bootstrap输出的所有记录的ts都是相同的,为bootstrap开始的时间1676444947 -> 2023-02-15 15:09:07