当前位置:   article > 正文

最全【极数系列】Flink集成KafkaSink &; 实时输出数据(11)(3),2024年最新头条面试题_sink输出算子——写kafka到控制台消费(scala版本)

sink输出算子——写kafka到控制台消费(scala版本)

img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

    )
    .setDeliveryGuarantee(DeliveryGuarantee.AT\_LEAST\_ONCE)
    .build();
  • 1
  • 2
  • 3

stream.sinkTo(sink);

以下属性在构建 KafkaSink 时是必须指定的:
Bootstrap servers, setBootstrapServers(String)
消息序列化器(Serializer), setRecordSerializer(KafkaRecordSerializationSchema)
如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String)


## 04 序列化器


1. 构建时需要提供 `KafkaRecordSerializationSchema` 来将输入数据转换为 Kafka 的 `ProducerRecord`。Flink 提供了 schema 构建器 以提供一些通用的组件,例如消息键(key)/消息体(value)序列化、topic 选择、消息分区,同样也可以通过实现对应的接口来进行更丰富的控制。
2. 其中消息体(value)序列化方法和 topic 的选择方法是必须指定的,此外也可以通过 `setKafkaKeySerializer(Serializer)` 或 `setKafkaValueSerializer(Serializer)` 来使用 Kafka 提供而非 Flink 提供的序列化器



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

KafkaRecordSerializationSchema.builder()
.setTopicSelector((element) -> {})
.setValueSerializationSchema(new SimpleStringSchema())
.setKeySerializationSchema(new SimpleStringSchema())
.setPartitioner(new FlinkFixedPartitioner())
.build();


05 容错恢复



  • 1
  • 2
  • 3
  • 4
  • 5

KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee)。对于 DeliveryGuarantee.AT_LEAST_ONCEDeliveryGuarantee.EXACTLY_ONCE,Flink checkpoint 必须启用。默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。 以下是对不同语义保证的解释:


* `DeliveryGuarantee.NONE` 不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。
* `DeliveryGuarantee.AT_LEAST_ONCE`: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
* `DeliveryGuarantee.EXACTLY_ONCE`: 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 `isolation.level`),在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。


## 05 指标监控


Kafka sink 会在不同的[范围(Scope)]( )中汇报下列指标。




| 范围 | 指标 | 用户变量 | 描述 | 类型 |
| --- | --- | --- | --- | --- |
| 算子 | currentSendTime | n/a | 发送最近一条数据的耗时。该指标反映最后一条数据的瞬时值。 | Gauge |


## 06 项目源码实战


### 6.1 包结构


![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/04a49613acae4612b4930f0527e9d4ee.jpeg)


### 6.2 pom.xml依赖



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
<?xml version="1.0" encoding="UTF-8"?>


4.0.0

<groupId>com.xsy</groupId>
<artifactId>aurora_flink_connector_kafka</artifactId>
<version>1.0-SNAPSHOT</version>

<!--属性设置-->
<properties>
    <!--java_JDK版本-->
    <java.version>11</java.version>
    <!--maven打包插件-->
    <maven.plugin.version>3.8.1</maven.plugin.version>
    <!--编译编码UTF-8-->
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--输出报告编码UTF-8-->
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <!--json数据格式处理工具-->
    <fastjson.version>1.2.75</fastjson.version>
    <!--log4j版本-->
    <log4j.version>2.17.1</log4j.version>
    <!--flink版本-->
    <flink.version>1.18.0</flink.version>
    <!--scala版本-->
    <scala.binary.version>2.11</scala.binary.version>
</properties>

<!--通用依赖-->
<dependencies>

    <!-- json -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>


    <!--================================集成外部依赖==========================================-->
    <!--集成日志框架 start-->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>${log4j.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-api</artifactId>
        <version>${log4j.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>${log4j.version}</version>
    </dependency>

    <!--集成日志框架 end-->

    <!--kafka依赖 start-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.0.2-1.18</version>
    </dependency>
    <!--kafka依赖 end-->

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>1.18.0</version>
    </dependency>
</dependencies>

<!--编译打包-->
<build>
    <finalName>${project.name}</finalName>
    <!--资源文件打包-->
    <resources>
        <resource>
            <directory>src/main/resources</directory>
        </resource>
        <resource>
            <directory>src/main/java</directory>
            <includes>
                <include>**/*.xml</include>
            </includes>
        </resource>
    </resources>

    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.flink:force-shading</exclude>
                                <exclude>org.google.code.flindbugs:jar305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <excluder>org.apache.logging.log4j:*</excluder>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>org.aurora.KafkaStreamingJob</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>

    <!--插件统一管理-->
    <pluginManagement>
        <plugins>
            <!--maven打包插件-->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring.boot.version}</version>
                <configuration>
                    <fork>true</fork>
                    <finalName>${project.build.finalName}</finalName>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!--编译打包插件-->
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.plugin.version}</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                    <compilerArgs>
                        <arg>-parameters</arg>
                    </compilerArgs>
                </configuration>
            </plugin>
        </plugins>
    </pluginManagement>
</build>

<!--配置Maven项目中需要使用的远程仓库-->
<repositories>
    <repository>
        <id>aliyun-repos</id>
        <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

<!--用来配置maven插件的远程仓库-->
<pluginRepositories>
    <pluginRepository>
        <id>aliyun-plugin</id>
        <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </pluginRepository>
</pluginRepositories>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208

### 6.3 配置文件


(1)application.properties



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

#kafka集群地址
kafka.bootstrapServers=localhost:9092
#kafka主题
kafka.topic=topic_a
#kafka消费者组
kafka.group=aurora_group


(2)log4j2.properties



  • 1
  • 2
  • 3
  • 4
  • 5

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender

img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

[外链图片转存中…(img-6xWbYwNA-1715800802363)]
[外链图片转存中…(img-bXEBvHjq-1715800802364)]
[外链图片转存中…(img-teS1HhAs-1715800802364)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/816197
推荐阅读
相关标签
  

闽ICP备14008679号