当前位置:   article > 正文

Flink端到端的精确一次_flink精确一次

flink精确一次


参考来自liang老师的文章: 文章链接

Flink端到端的精确一次

步骤

实验一:

linux:

1.hadoop,zookeeper和kafka启动

2.创建flink需要的存档点

 checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/yxh");
  • 1

更具代码的路径创建对应的目录

hdfs dfs -mkdir /yxh
  • 1

3.创建对应需要的kafka主题

kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 1 --partitions 1 --topic topic_1

kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 1 --partitions 1 --topic ws
  • 1
  • 2
  • 3

4.需要两个窗口界面:
1).窗口1:

kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_1
  • 1

2).窗口2:

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws
  • 1

win:

运行KafkaEOSDemo 代码

结果:

Untitled

实验二:

只需要在实验一的基础上修改kafkf消费主题命令就行

linux

kafka-console-producer.sh --broker-list hadoop102:9092 --topic ws
  • 1

win

运行KafkaEOSDemo2 代码

结果:

Untitled

代码

KafkaEOSDemo

package com.lfl.bigwork.exactly_once;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.time.Duration;

public class KafkaEOSDemo {
    public static void main(String[] args) throws Exception {
        // 获取流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置Hadoop的用户名和配置文件路径
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        System.setProperty("HADOOP_CONF_DIR", "/opt/module/hadoop/etc/hadoop");

        // 启用检查点,设置为精准一次,hdfs
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/yxh");

        // 设置检查点清理方式为取消任务时保留
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 构建KafkaSource,设置Kafka服务器地址、消费组ID、主题、反序列化器和起始偏移量
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setGroupId("test")
                .setTopics("topic_1")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();

        // 从KafkaSource中读取数据,设置水印策略和源名称
        DataStreamSource<String> kafkaDataStreamSource = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");

        // 构建KafkaSink,设置Kafka服务器地址、序列化器、交付保证、事务ID前缀和事务超时时间
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 精准一次,开启2pc
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("prefix-")
                // 精准一次,必须设置事务超时时间: 大于checkpoint间隔,小于15分
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();

        // 将数据流写入KafkaSink
        kafkaDataStreamSource.sinkTo(kafkaSink);

        // 启动流处理任务
        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

KafkaEOSDemo2

package com.lfl.bigwork.exactly_once;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.time.Duration;

/**
 * kafka 消费者 消费ws主题数据
 */
public class KafkaEOSDemo2 {
    public static void main(String[] args) throws Exception {
        // 获取流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 构建KafkaSource,设置Kafka服务器地址、消费组ID、主题、反序列化器和起始偏移量
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setGroupId("test")
                .setTopics("ws")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.latest())
                // 作为下游的消费者,要设置事务的隔离级别为: 读已提交
                .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
                .build();

        // 从KafkaSource中读取数据,设置水印策略和源名称,然后打印
        env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource")
                .print();

        // 启动流处理任务
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lfl</groupId>
    <artifactId>FlinkStudy</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.0</flink.version>
    </properties>

<dependencies>

    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-files</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-datagen</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.31</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>3.1.2-1.17</version>
        <scope>provided</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility_2.12</artifactId>
        <version>1.17.2</version>
        <scope>test</scope>
    </dependency>

</dependencies>

    <repositories>
        <repository>
            <id>apache-snapshots</id>
            <name>apache snapshots</name>
<!--            <url>https://repository.apache.org/content/repositories/snapshots/</url>-->
            <url>https://maven.aliyun.org/repositories/apache-snapshots/</url>
        </repository>
    </repositories>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

文档

KafkaEOSDemo

1. 创建环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  • 1
  • 2

这行代码创建了一个Flink执行环境,它是所有Flink程序的开始。

2. 启用检查点

env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

  • 1
  • 2

这行代码启用了检查点,并设置了检查点的模式为精准一次。检查点间隔为5000毫秒。

3. 设置检查点存储路径

checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/yxh");

  • 1
  • 2

这行代码设置了检查点的存储路径,这里我们选择了HDFS作为存储介质。

4. 从Kafka读取数据

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 1
    .setGroupId("test") // 2
    .setTopics("topic_1") // 3
    .setValueOnlyDeserializer(new SimpleStringSchema()) // 4
    .setStartingOffsets(OffsetsInitializer.latest()) // 5
    .build(); // 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  1. 调用setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)设置Kafka的服务器地址,也就是Kafka broker的地址。
  2. 调用setGroupId(test)设置消费者组ID,Kafka使用消费者组来区分不同的消费者。
  3. 调用setTopics(topic_1)设置要订阅的Kafka主题。
  4. 调用setValueOnlyDeserializer(new SimpleStringSchema())设置反序列化器,这里使用的是简单字符串反序列化器。
  5. 调用setStartingOffsets(OffsetsInitializer.latest())设置起始偏移量,这里设置为从最新的数据开始消费。
  6. 调用build()构建KafkaSource实例。

5. 将数据写入到另一个Kafka主题

KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
    .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 1
    .setRecordSerializer(
        KafkaRecordSerializationSchema.<String>builder()
            .setTopic("ws") // 2
            .setValueSerializationSchema(new SimpleStringSchema()) // 3
            .build()
    ) // 4
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 5
    .setTransactionalIdPrefix("prefix-") // 6
    .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "") // 7
    .build(); // 8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. 同KafkaSource的setBootstrapServers
  2. setTopic(ws):设置要写入的Kafka主题。
  3. setValueSerializationSchema(new SimpleStringSchema()):设置序列化器,这里我们使用的是简单字符串序列化器。
  4. setRecordSerializer(...):设置记录序列化器,用于将Flink的数据记录序列化为Kafka可以接受的格式。
  5. setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE):设置交付保证,这里我们设置为精确一次,也就是说每条记录将被精确地写入一次。
  6. setTransactionalIdPrefix(prefix-):设置事务ID前缀,这是用于Kafka事务的标识。
  7. setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + ):设置事务超时时间,这是Kafka事务的一个重要参数,它决定了一个事务可以保持打开状态的最长时间。间。
  8. build():构建KafkaSink实例。

KafkaEOSDemo2

1. 创建环境

同KafkaEOSDemo。

2. 从Kafka读取数据

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
    .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092") // 1
    .setGroupId("test") // 2
    .setTopics("ws") // 3
    .setValueOnlyDeserializer(new SimpleStringSchema()) // 4
    .setStartingOffsets(OffsetsInitializer.latest()) // 5
    .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") // 6
    .build(); // 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092"):设置Kafka的服务器地址,也就是Kafka broker的地址。
  2. setGroupId("test"):设置消费者组ID,Kafka使用消费者组来区分不同的消费者。
  3. setTopics("ws"):设置要订阅的Kafka主题。
  4. setValueOnlyDeserializer(new SimpleStringSchema()):设置反序列化器,这里我们使用的是简单字符串反序列化器。
  5. setStartingOffsets(OffsetsInitializer.latest()):设置起始偏移量,这里我们设置为从最新的数据开始消费。
  6. setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"):设置事务的隔离级别为读已提交,这是Kafka事务的一个重要配置。读已提交的隔离级别表示消费者在读取数据时,只能读取到已经被提交的事务。
  7. build():构建KafkaSource实例。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/891366
推荐阅读
相关标签
  

闽ICP备14008679号