赞
踩
由于 flink / kafka 的版本不断更新,创建项目的时候就应当考虑清楚这几个依赖库的版本问题,尽可能地与实际场景保持一致,比如服务器上部署的 kafka 是哪个版本,flink 是哪个版本,从而确定我们需要开发的是哪个版本,并且在真正的开发工作开始之前,应当先测试一下保证 kafka 的版本 、 flink 的版本一致,至少大版本一致,不存在冲突问题,不要为以后的部署埋坑。
比如 flink 选择的是 1.12.7
这个版本,我们前去 maven 仓库查看 flink-connect-kafka 的版本。首先访问 链接1
https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka 找到对应的 1.12.7
这个版本,也就是根据 flink 的版本去寻找 flink-connect-kafka 的版本,记作 链接2
即 https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka_2.12/1.12.7。
进入 链接2
对应的地址后,可以发现提供的 maven 地址如下:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.12.7</version>
</dependency>
这个地方已经明确地指出, kafka_2.12
指的是对应的是使用 scala
的版本是 2.12
编写的 kakfa ,也就是对应的是 scala 2.12 的版本。为了确保无误,请确保安装的 kafka
也是这个版本。
类似地
,如果是其他版本的 flink
也要找到对应的flink-connector-kafka
版本,确保 kafka 的版本的 scala 是一致的。
为了规范,我们把版本号写在前面,然后再引用这些依赖。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.smileyan.demo</groupId> <artifactId>flink-kafka</artifactId> <version>1.0-SNAPSHOT</version> <packaging>pom</packaging> <properties> <java.version>8</java.version> <!-- flink 的版本 --> <flink.version>1.12.7</flink.version> <!-- scala 的版本(也就是 kafka 的源码的版本)--> <scala.binary.version>2.12</scala.binary.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version> <slf4j.version>2.0.7</slf4j.version> </properties> <profiles> <profile> <id>local</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <flink.scope>compile</flink.scope> </properties> </profile> <profile> <id>prod</id> <activation> <activeByDefault>false</activeByDefault> </activation> <properties> <flink.scope>provided</flink.scope> </properties> </profile> </profiles> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>${maven-shade-plugin.version}</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> </plugins> </build> </project>
根据实际需要,调整 flink 的版本以及 scala 的版本。一定要确保最终我们在 maven 仓库中能找到对应的版本。
再次强调
:一定要找到对应的版本的示例。新版本的 flink 不再支持 new FlinkKafkaProducer 以及 new FlinkKafkaConsumer 这类操作,所以一定要结合实际情况进行调整。
消费者
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(
java.util.regex.Pattern.compile("test-topic-[0-9]"),
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer);
生产者
DataStream<String> stream = ... Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() { @Override public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) { return new ProducerRecord<>( "my-topic", // target topic element.getBytes(StandardCharsets.UTF_8)); // record contents } }; FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>( "my-topic", // target topic serializationSchema, // serialization schema properties, // producer config FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance stream.addSink(myProducer);
消费者
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
生产者
DataStream<String> stream = ...
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
)
.build();
stream.sinkTo(sink);
flink 的 job 开发完成以后,需要打包上传到服务器端运行,实际上 flink-java
flink-stream
等等依赖包在服务器端的 flink 都提供了这些依赖,所以打包的时候可以去除这些依赖,以减小打包后的 jar 文件的大小。
所以如上面的 pom.xml 文件所示,我们把很多依赖的 scope 设置为 provided
,但是带来的新问题就是运行 flink job 的main方法时会出现报错提示这些依赖找不到:
这时我们需要进行配置,步骤如下:
再次运行就不会报错了。
flink-kafka 的项目创建本身应当是一件很容易的事情,但是为了避免为以后的开发埋雷,一定要规范地编写依赖,并结合实际情况对版本进行调整,并非一切都应当用最新版本的。
Smileyan
2023-03-25 00:29
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。