0">一.环境安装
1.zk
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz --no-check-certificate
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz -C /usr/local/
cd /usr/local
ln -sL apache-zookeeper-3.7.0-bin zookeeper
cd /usr/local/apache-zookeeper-3.7.0-bin/conf
cp -rp zoo_sample.cfg zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/log
clientPort=2181
cd /usr/local/zookeeper/bin && ./zkServer.sh start
2.kafka
wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -zxvf kafka_2.13-2.7.0.tgz -C /usr/local
cd /usr/local
ln -sL kafka_2.13-2.7.0 kafka
cd kafka/config
vi server.properties
- 内容
listeners=PLAINTEXT://127.0.0.1:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/usr/local/kafka/kafka-logs
host.name=127.0.0.1
zookeeper.connect=localhost:2181
cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon config/server.properties
# kafka
3.debezium connector
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.1.Final/debezium-connector-mysql-1.6.1.Final-plugin.tar.gz
mkdir /usr/local/kafka/plugins
tar -zxvf debezium-connector-mysql-1.0.3.Final-plugin.tar.gz -C /usr/local/kafka/plugins
vi /usr/local/kafka/config/connect-standalone.properties
bootstrap.servers=127.0.0.1:9092
plugin.path=/usr/local/kafka/plugins
# listeners=http://localhost:7778 默认是8083
二.standalone读取binlog到kafka
-
- 1.数据库授权
-
grant select,show view,RELOAD, SHOW DATABASES, replication client,replication slave,lock tables on *.* to kafka@'%' identified by 'kafka';
-
- 2.准备connector参数 mysql.properities
-
name=mysql_connect_source
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=192.168.234.xx
database.port=3306
database.user=kafka
database.password=kafka
database.server.id=234008001
database.server.name=kafka_db
database.whitelist=你的数据库名称
table.include.list=schema.table1,schema.table2
database.history.kafka.bootstrap.servers=127.0.0.1:9092
database.history.kafka.topic=kafka_db.debezium
time.precision.mode=connect
include.schema.changes=true
errors.tolerance=all
# 包含sql原始语句
include.query=true
# 默认是 initial 获取该表所有数据;schema_only 当前位置获取;schema_only_recovery 误删 topic 后恢复
snapshot.mode=schema_only
database.history.skip.unparseable.ddl=true
-
- 3.启动
-
nohup connect-standalone.sh config/connect-standalone.properties mysql.properities > nohuo.out &
-
- 4.连接zk查看topic
-
./bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
-
- 5.命令行消费topic
-
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic {database.server.name}.{table.include.list之一} --from-beginning
-
- 6.删除topic
-
bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 -topic
topic1,topic2
三.restapi管理standlone的connector
1.建表
- 1.1开启消费
cd /usr/local/kafka && ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic debezium --from-beginning
- 1.2源库建表插入数据
insert into dba_test_table(name) values('aa');
- 1.3发现kafka上多了一张表
debezium.debezium.dba_test_table
2.观察这个topic
- 2.1 insert into dba_test_table(name) values('aa');
"payload": {
"before": null,
"after": {
"id": 1,
"name": "aa",
"address": null
},
"source": {
"version": "1.6.1.Final",
"connector": "mysql",
"name": "debezium",
"ts_ms": 1638348372000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "dba_test_table",
"server_id": 23400801,
"gtid": "66339ee5-3d10-11ec-a611-0242ac110002:1263042",
"file": "mysql-bin.000034",
"pos": 747079405,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1638348372161,
"transaction": null
}
- 2.2 update dba_test_table set name='bb', address='addr1' where id=1;
"payload": {
"before": {
"id": 1,
"name": "aa",
"address": null
},
"after": {
"id": 1,
"name": "bb",
"address": "addr1"
},
"source": {
"version": "1.6.1.Final",
"connector": "mysql",
"name": "debezium",
"ts_ms": 1638348630000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "dba_test_table",
"server_id": 23400801,
"gtid": "66339ee5-3d10-11ec-a611-0242ac110002:1307889",
"file": "mysql-bin.000034",
"pos": 818840233,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1638348630034,
"transaction": null
}
- 2.3 delete from dba_test_table where id=1;
"payload": {
"before": {
"id": 1,
"name": "bb",
"address": "addr1"
},
"after": null,
"source": {
"version": "1.6.1.Final",
"connector": "mysql",
"name": "debezium",
"ts_ms": 1638348731000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "dba_test_table",
"server_id": 23400801,
"gtid": "66339ee5-3d10-11ec-a611-0242ac110002:1325649",
"file": "mysql-bin.000034",
"pos": 847104330,
"row": 0,
"thread": null,
"query": null
},
"op": "d",
"ts_ms": 1638348731068,
"transaction": null
}
- 2.4 modify 修改长度:alter table dba_test_table modify `address` varchar(64) DEFAULT NULL; kafka没有输出 - 2.5 modify 修改默认值:alter table dba_test_table modify `address` varchar(64) DEFAULT ''; kafka没有输出 - 2.6 add 修改默认值:alter table dba_test_table add `age` tinyint DEFAULT '18'; kafka没有输出