1.前置要求
flink cdc底层就是通过监控mysql的binlog日志,实时捕获到一个表或多个表的变更;所以必须开启mysql的binlog日志。
1.1 打开mysql配置文件
mysql配置文件默认位于/etc/目录下,直接用过以下命令开启
sudo vim /etc/my.cnf
1.2 修改配置文件
##启动binlog,该参数的值会作为binlog的文件名 log-bin=mysql-bimysql ##binlog类型 binlog_format=row ##启用binlog的数据库,需根据实际情况作出修改,一个库占一行 binlog-do-db=库名 binlog-do-db=库名
1.3 重启mysql服务
systemctl restart mysqld
2.环境依赖
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.17.0</version> </dependency>
如果cdc版本用了2.4.0出现异常的可以退回2.3.0
3.代码实现
public class TestApp { public static void main(String[] args) { //1.获取执行环境 Configuration conf = new Configuration(); //设置web端口 conf.setInteger("rest.port",10000); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); //2.通过flink cdc 读取mysql中的维度数据并创建流 MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("主机号") .port(3306) .username("用户名") .password("密码") //设置mysql数据库 .databaseList("数据库名") //设置mysql表(多个用,分隔) .tableList("表1,表2") //设置cdc启动方式 .startupOptions(StartupOptions.initial()) //设置反序列化器 .deserializer(new JsonDebeziumDeserializationSchema()) .build(); DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "cdc-source"); //3.数据同步到kafka KafkaSink<String> kafkaSink = KafkaSink.<String>builder() //指定Kafka的连接地址 .setBootstrapServers("主机:端口号") //指定序列化器 .setRecordSerializer( KafkaRecordSerializationSchema.<String>builder() .setTopic("Topic") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) //写入kafka的一致性级别 .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) //如果是精确一次,必须设置事务的前缀 .setTransactionalIdPrefix("zhike-") //如果是精确一次必须设置事务超时时间 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "300000") .build(); streamSource.sinkTo(kafkaSink); //4.执行任务 try { env.execute("ods_cdc"); } catch (Exception e) { throw new RuntimeException(e); } } }