赞
踩
1.hadoop,zookeeper和kafka启动
2.创建flink需要的存档点
checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/yxh");
更具代码的路径创建对应的目录
hdfs dfs -mkdir /yxh
3.创建对应需要的kafka主题
kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 1 --partitions 1 --topic topic_1
kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 1 --partitions 1 --topic ws
4.需要两个窗口界面:
1).窗口1:
kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_1
2).窗口2:
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws
运行KafkaEOSDemo
代码
只需要在实验一的基础上修改kafkf消费主题命令就行
kafka-console-producer.sh --broker-list hadoop102:9092 --topic ws
运行KafkaEOSDemo2
代码
package com.lfl.bigwork.exactly_once; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.producer.ProducerConfig; import java.time.Duration; public class KafkaEOSDemo { public static void main(String[] args) throws Exception { // 获取流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置Hadoop的用户名和配置文件路径 System.setProperty("HADOOP_USER_NAME", "hadoop"); System.setProperty("HADOOP_CONF_DIR", "/opt/module/hadoop/etc/hadoop"); // 启用检查点,设置为精准一次,hdfs env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/yxh"); // 设置检查点清理方式为取消任务时保留 checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 构建KafkaSource,设置Kafka服务器地址、消费组ID、主题、反序列化器和起始偏移量 KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") .setGroupId("test") .setTopics("topic_1") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.latest()) .build(); // 从KafkaSource中读取数据,设置水印策略和源名称 DataStreamSource<String> kafkaDataStreamSource = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"); // 构建KafkaSink,设置Kafka服务器地址、序列化器、交付保证、事务ID前缀和事务超时时间 KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") .setRecordSerializer( KafkaRecordSerializationSchema.<String>builder() .setTopic("ws") .setValueSerializationSchema(new SimpleStringSchema()) .build() ) // 精准一次,开启2pc .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 精准一次,必须设置 事务的前缀 .setTransactionalIdPrefix("prefix-") // 精准一次,必须设置事务超时时间: 大于checkpoint间隔,小于15分 .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "") .build(); // 将数据流写入KafkaSink kafkaDataStreamSource.sinkTo(kafkaSink); // 启动流处理任务 env.execute(); } }
package com.lfl.bigwork.exactly_once; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.time.Duration; /** * kafka 消费者 消费ws主题数据 */ public class KafkaEOSDemo2 { public static void main(String[] args) throws Exception { // 获取流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 构建KafkaSource,设置Kafka服务器地址、消费组ID、主题、反序列化器和起始偏移量 KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") .setGroupId("test") .setTopics("ws") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.latest()) // 作为下游的消费者,要设置事务的隔离级别为: 读已提交 .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") .build(); // 从KafkaSource中读取数据,设置水印策略和源名称,然后打印 env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource") .print(); // 启动流处理任务 env.execute(); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.lfl</groupId> <artifactId>FlinkStudy</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.17.0</flink.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.31</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>3.1.2-1.17</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.12</artifactId> <version>1.17.2</version> <scope>test</scope> </dependency> </dependencies> <repositories> <repository> <id>apache-snapshots</id> <name>apache snapshots</name> <!-- <url>https://repository.apache.org/content/repositories/snapshots/</url>--> <url>https://maven.aliyun.org/repositories/apache-snapshots/</url> </repository> </repositories> </project>
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
这行代码创建了一个Flink执行环境,它是所有Flink程序的开始。
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
这行代码启用了检查点,并设置了检查点的模式为精准一次。检查点间隔为5000毫秒。
checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/yxh");
这行代码设置了检查点的存储路径,这里我们选择了HDFS作为存储介质。
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 1
.setGroupId("test") // 2
.setTopics("topic_1") // 3
.setValueOnlyDeserializer(new SimpleStringSchema()) // 4
.setStartingOffsets(OffsetsInitializer.latest()) // 5
.build(); // 6
setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)
设置Kafka的服务器地址,也就是Kafka broker的地址。setGroupId(test)
设置消费者组ID,Kafka使用消费者组来区分不同的消费者。setTopics(topic_1)
设置要订阅的Kafka主题。setValueOnlyDeserializer(new SimpleStringSchema())
设置反序列化器,这里使用的是简单字符串反序列化器。setStartingOffsets(OffsetsInitializer.latest())
设置起始偏移量,这里设置为从最新的数据开始消费。build()
构建KafkaSource实例。KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 1
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("ws") // 2
.setValueSerializationSchema(new SimpleStringSchema()) // 3
.build()
) // 4
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 5
.setTransactionalIdPrefix("prefix-") // 6
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "") // 7
.build(); // 8
setBootstrapServers
。setTopic(ws)
:设置要写入的Kafka主题。setValueSerializationSchema(new SimpleStringSchema())
:设置序列化器,这里我们使用的是简单字符串序列化器。setRecordSerializer(...)
:设置记录序列化器,用于将Flink的数据记录序列化为Kafka可以接受的格式。setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
:设置交付保证,这里我们设置为精确一次,也就是说每条记录将被精确地写入一次。setTransactionalIdPrefix(prefix-)
:设置事务ID前缀,这是用于Kafka事务的标识。setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + )
:设置事务超时时间,这是Kafka事务的一个重要参数,它决定了一个事务可以保持打开状态的最长时间。间。build()
:构建KafkaSink实例。同KafkaEOSDemo。
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 1
.setGroupId("test") // 2
.setTopics("ws") // 3
.setValueOnlyDeserializer(new SimpleStringSchema()) // 4
.setStartingOffsets(OffsetsInitializer.latest()) // 5
.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") // 6
.build(); // 7
setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
:设置Kafka的服务器地址,也就是Kafka broker的地址。setGroupId("test")
:设置消费者组ID,Kafka使用消费者组来区分不同的消费者。setTopics("ws")
:设置要订阅的Kafka主题。setValueOnlyDeserializer(new SimpleStringSchema())
:设置反序列化器,这里我们使用的是简单字符串反序列化器。setStartingOffsets(OffsetsInitializer.latest())
:设置起始偏移量,这里我们设置为从最新的数据开始消费。setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
:设置事务的隔离级别为读已提交,这是Kafka事务的一个重要配置。读已提交的隔离级别表示消费者在读取数据时,只能读取到已经被提交的事务。build()
:构建KafkaSource实例。Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。