Flink实战八之使用copilot开发监听BinLog
·
前言

GitHub Copilot 是全球首款 AI 编程助手,由 GitHub 与 OpenAI 合作开发,基于 GPT-4 模型。它的核心使命是:把开发者从重复劳动中解放出来,专注创造性工作。
在这小节主要介绍如何使用 copilot 工具生成一个 从MySQL数据库读取数据发送kafka的例子。
准备工作
创建MySQL的表user
CREATE TABLE `user` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(45) DEFAULT NULL,
`age` int DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
创建kafka的消息主题:quickstart-events
yangyanping@yangyaningdeAir kafka_2.13-4.2.0 % bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.
开启消息监听
yangyanping@yangyaningdeAir bin % ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events
实战操作
打开idea 底部的 Open GitHub Copilot Chat,如图:

在 GitHub Copilot Chat 窗口输入:写一个flink读取mySQL数据,写入kafka

Copilot 生成的代码如下:

在 GitHub Copilot Chat 窗口输入:解决类引用失败的问题

修复完毕后的 POM 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-kafka-example</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Kafka Example</name>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.19.3</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<kafka.version>3.6.0</kafka.version>
<log4j.version>2.24.3</log4j.version>
<mysql.version>8.0.33</mysql.version>
</properties>
<dependencies>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink CDC 连接器 -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.0.1</version>
</dependency>
<!-- MySQL 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<!-- 核心修复:引入 Flink Table API 依赖(ObjectPath 所在模块) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 核心修复:补充 RecordEmitter 所在的 connector-base 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.2-1.18</version>
</dependency>
<!-- Kafka Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- JSON Support -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Lombok (可选) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<!-- Maven Shade Plugin - 打包 fat jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.yyp.MySQLCDCExample</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
MySQLToKafkaExample 类如下:
package com.yyp;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class MySQLToKafkaExample {
public static void main(String[] args) throws Exception {
// 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 每 5 秒一次 Checkpoint
env.setParallelism(1); // 演示环境设为 1,生产环境根据数据量调整
// 2. 构建 MySQL Source
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("root")
.password("Yangyanping@1981")
.databaseList("ad") // 监控的数据库
.tableList("ad.user") // 监控的表,必须带库名前缀
.startupOptions(StartupOptions.initial()) // 启动模式
.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化为 JSON 字符串
.build();
// 3. 从 Source 创建数据流
DataStreamSource<String> streamSource = env.fromSource(
mySqlSource,
WatermarkStrategy.noWatermarks(),
"MySQL CDC Source"
);
// 4. 配置 Kafka Sink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092") // Kafka 集群地址
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("quickstart-events") // 目标 Kafka Topic
.setValueSerializationSchema(new SimpleStringSchema()) // 使用 SimpleStringSchema
.build())
.setKafkaProducerConfig(new Properties() {{
put(ProducerConfig.ACKS_CONFIG, "all");
}})
.build();
// 5. 将数据写入 Kafka
streamSource.print();
streamSource.sinkTo(kafkaSink);
// 6. 启动任务
env.execute("Flink MySQL to Kafka Job");
}
}
测试
启动MySQLToKafkaExample 类, 在 user 表中修改数据,如图:

kafka 监听器监听的消息:
yangyanping@yangyaningdeAir bin % ./kafka-console-consumer.sh
--bootstrap-server localhost:9092
--topic quickstart-events
{"before":{"id":2,"name":"yangyanping","age":18,"create_time":1772236800000,"update_time":1772236800000},"after":{"id":2,"name":"yangyanping","age":20,"create_time":1772236800000,"update_time":1772236800000},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1772370908000,"snapshot":"false","db":"ad","sequence":null,"table":"user","server_id":1,"gtid":null,"file":"binlog.000020","pos":4749,"row":0,"thread":9,"query":null},"op":"u","ts_ms":1772370908412,"transaction":null}
{"before":{"id":3,"name":"xiyangyang","age":18,"create_time":1772236800000,"update_time":1772236800000},"after":{"id":3,"name":"xiyangyang","age":30,"create_time":1772236800000,"update_time":1772236800000},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1772370908000,"snapshot":"false","db":"ad","sequence":null,"table":"user","server_id":1,"gtid":null,"file":"binlog.000020","pos":4908,"row":0,"thread":9,"query":null},"op":"u","ts_ms":1772370908412,"transaction":null}
一个简单的例子就介绍到这里 !
更多推荐


所有评论(0)