前言

GitHub Copilot 是全球首款 AI 编程助手,由 GitHub 与 OpenAI 合作开发,基于 GPT-4 模型。它的核心使命是:把开发者从重复劳动中解放出来,专注创造性工作。

在这小节主要介绍如何使用 copilot 工具生成一个 从MySQL数据库读取数据发送kafka的例子。

准备工作

创建MySQL的表user 

CREATE TABLE `user` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(45) DEFAULT NULL,
  `age` int DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

创建kafka的消息主题:quickstart-events

yangyanping@yangyaningdeAir kafka_2.13-4.2.0 % bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.

开启消息监听

yangyanping@yangyaningdeAir bin % ./kafka-console-consumer.sh  --bootstrap-server localhost:9092  --topic quickstart-events

实战操作

打开idea 底部的 Open GitHub Copilot Chat,如图:

在 GitHub Copilot Chat 窗口输入:写一个flink读取mySQL数据,写入kafka

Copilot 生成的代码如下:

在 GitHub Copilot Chat 窗口输入:解决类引用失败的问题

修复完毕后的 POM 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.example</groupId>
	<artifactId>flink-kafka-example</artifactId>
	<version>1.0-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>Flink Kafka Example</name>

	<properties>
		<maven.compiler.source>11</maven.compiler.source>
		<maven.compiler.target>11</maven.compiler.target>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.19.3</flink.version>
		<scala.binary.version>2.12</scala.binary.version>
		<kafka.version>3.6.0</kafka.version>
		<log4j.version>2.24.3</log4j.version>
		<mysql.version>8.0.33</mysql.version>
	</properties>

	<dependencies>
		<!-- Flink Core -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- Flink CDC 连接器 -->
		<dependency>
			<groupId>com.ververica</groupId>
			<artifactId>flink-connector-mysql-cdc</artifactId>
			<version>3.0.1</version>
		</dependency>

		<!-- MySQL 驱动 -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.33</version>
		</dependency>
		<!-- 核心修复:引入 Flink Table API 依赖(ObjectPath 所在模块) -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- 核心修复:补充 RecordEmitter 所在的 connector-base 依赖 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-base</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- Flink Clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- Flink Kafka Connector -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>3.0.2-1.18</version>
		</dependency>

		<!-- Kafka Client -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>${kafka.version}</version>
		</dependency>

		<!-- JSON Support -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- Lombok (可选) -->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.30</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<!-- Maven Compiler Plugin -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.11.0</version>
				<configuration>
					<source>11</source>
					<target>11</target>
				</configuration>
			</plugin>

			<!-- Maven Shade Plugin - 打包 fat jar -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.2.4</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<createDependencyReducedPom>false</createDependencyReducedPom>
							<transformers>
								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>com.yyp.MySQLCDCExample</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

MySQLToKafkaExample 类如下:

package com.yyp;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

public class MySQLToKafkaExample {
    public static void main(String[] args) throws Exception {
        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000); // 每 5 秒一次 Checkpoint
        env.setParallelism(1); // 演示环境设为 1,生产环境根据数据量调整

        // 2. 构建 MySQL Source
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("root")
                .password("Yangyanping@1981")
                .databaseList("ad") // 监控的数据库
                .tableList("ad.user") // 监控的表,必须带库名前缀
                .startupOptions(StartupOptions.initial()) // 启动模式
                .deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化为 JSON 字符串
                .build();

        // 3. 从 Source 创建数据流
        DataStreamSource<String> streamSource = env.fromSource(
                mySqlSource,
                WatermarkStrategy.noWatermarks(),
                "MySQL CDC Source"
        );

        // 4. 配置 Kafka Sink
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("localhost:9092") // Kafka 集群地址
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("quickstart-events") // 目标 Kafka Topic
                        .setValueSerializationSchema(new SimpleStringSchema()) // 使用 SimpleStringSchema
                        .build())
                .setKafkaProducerConfig(new Properties() {{
                    put(ProducerConfig.ACKS_CONFIG, "all");
                }})
                .build();

        // 5. 将数据写入 Kafka
        streamSource.print();
        streamSource.sinkTo(kafkaSink);

        // 6. 启动任务
        env.execute("Flink MySQL to Kafka Job");
    }
}

测试

启动MySQLToKafkaExample 类, 在 user 表中修改数据,如图:

kafka 监听器监听的消息:

yangyanping@yangyaningdeAir bin % ./kafka-console-consumer.sh 
  --bootstrap-server localhost:9092 
  --topic quickstart-events

{"before":{"id":2,"name":"yangyanping","age":18,"create_time":1772236800000,"update_time":1772236800000},"after":{"id":2,"name":"yangyanping","age":20,"create_time":1772236800000,"update_time":1772236800000},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1772370908000,"snapshot":"false","db":"ad","sequence":null,"table":"user","server_id":1,"gtid":null,"file":"binlog.000020","pos":4749,"row":0,"thread":9,"query":null},"op":"u","ts_ms":1772370908412,"transaction":null}
{"before":{"id":3,"name":"xiyangyang","age":18,"create_time":1772236800000,"update_time":1772236800000},"after":{"id":3,"name":"xiyangyang","age":30,"create_time":1772236800000,"update_time":1772236800000},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1772370908000,"snapshot":"false","db":"ad","sequence":null,"table":"user","server_id":1,"gtid":null,"file":"binlog.000020","pos":4908,"row":0,"thread":9,"query":null},"op":"u","ts_ms":1772370908412,"transaction":null}

 一个简单的例子就介绍到这里 !

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐