当前位置:   article > 正文

Java代码:使用flink消费kafka消息代码示例_在flink中消费kafka中的日志流,并应用窗口函数来监控用户请求的频率。

在flink中消费kafka中的日志流,并应用窗口函数来监控用户请求的频率。

目录结构

在这里插入图片描述

kafka发送消息

代码示例

package org.davidcampos.kafka.producer;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.davidcampos.kafka.commons.Commons;

import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

public class KafkaProducerExample {
    private static final Logger logger = LogManager.getLogger(KafkaProducerExample.class);

    public static void main(final String... args) {
        // Create topic
        createTopic();

        String[] words = new String[]{"one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"};
        Random ran = new Random(System.currentTimeMillis());

        final Producer<String, String> producer = createProducer();
        int EXAMPLE_PRODUCER_INTERVAL = System.getenv("EXAMPLE_PRODUCER_INTERVAL") != null ?
                Integer.parseInt(System.getenv("EXAMPLE_PRODUCER_INTERVAL")) : 100;

        try {
            while (true) {
                String word = words[ran.nextInt(words.length)];
                String uuid = UUID.randomUUID().toString();

                ProducerRecord<String, String> record = new ProducerRecord<>(Commons.EXAMPLE_KAFKA_TOPIC, uuid, word);
                RecordMetadata metadata = producer.send(record).get();

                logger.info("Sent ({}, {}) to topic {} @ {}.", uuid, word, Commons.EXAMPLE_KAFKA_TOPIC, metadata.timestamp());

                Thread.sleep(EXAMPLE_PRODUCER_INTERVAL);
            }
        } catch (InterruptedException | ExecutionException e) {
            logger.error("An error occurred.", e);
        } finally {
            producer.flush();
            producer.close();
        }
    }

    private static Producer<String, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Commons.EXAMPLE_KAFKA_SERVER);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerExample");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

    private static void createTopic() {
        int sessionTimeoutMs = 10 * 1000;
        int connectionTimeoutMs = 8 * 1000;

        ZkClient zkClient = new ZkClient(
                Commons.EXAMPLE_ZOOKEEPER_SERVER,
                sessionTimeoutMs,
                connectionTimeoutMs,
                ZKStringSerializer$.MODULE$);

        boolean isSecureKafkaCluster = false;
        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(Commons.EXAMPLE_ZOOKEEPER_SERVER), isSecureKafkaCluster);

        int partitions = 1;
        int replication = 1;

        // Add topic configuration here
        Properties topicConfig = new Properties();
        if (!AdminUtils.topicExists(zkUtils, Commons.EXAMPLE_KAFKA_TOPIC)) {
            AdminUtils.createTopic(zkUtils, Commons.EXAMPLE_KAFKA_TOPIC, partitions, replication, topicConfig, RackAwareMode.Safe$.MODULE$);
            logger.info("Topic {} created.", Commons.EXAMPLE_KAFKA_TOPIC);
        } else {
            logger.info("Topic {} already exists.", Commons.EXAMPLE_KAFKA_TOPIC);
        }

        zkClient.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

代码解释

这段代码是一个简单的 Kafka 生产者示例。它使用 Apache Kafka 客户端库来将消息发送到 Kafka 主题。

代码解释如下:

  1. 在类的开头,定义了一个名为 “KafkaProducerExample” 的公共类,并创建了一个静态的日志记录器 “logger”,用于记录日志信息。
  2. main 方法是程序的入口点。首先调用 createTopic() 方法创建 Kafka 主题。
  3. 创建一个包含一些单词的字符串数组 words,以及一个随机数生成器 ran
  4. 调用 createProducer() 方法创建一个 Kafka 生产者对象 producer
  5. 定义变量 EXAMPLE_PRODUCER_INTERVAL,它表示消息发送的时间间隔。它从环境变量 EXAMPLE_PRODUCER_INTERVAL 中获取,如果未定义,则默认为 100 毫秒。
  6. 在一个无限循环中,随机选择一个单词和一个 UUID,并使用选定的单词和 UUID 创建一个 ProducerRecord 对象 record。然后通过 producer.send(record) 方法将消息发送到 Kafka 主题。
  7. 通过 producer.send(record).get() 获取发送消息的元数据 metadata,并将元数据信息记录到日志中。
  8. 使用 Thread.sleep(EXAMPLE_PRODUCER_INTERVAL) 方法控制消息发送的时间间隔。
  9. 如果发生异常,通过捕获 InterruptedExceptionExecutionException 异常来处理,并记录错误信息到日志中。
  10. 在最后的 finally 块中,调用 producer.flush() 方法确保所有未发送的消息被刷新到 Kafka,然后关闭生产者。
  11. createProducer() 方法用于创建 Kafka 生产者对象。它设置了一些 Kafka 生产者的属性,例如 BOOTSTRAP_SERVERS_CONFIG 表示 Kafka 服务器地址,CLIENT_ID_CONFIG 表示客户端ID,KEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG 表示键和值的序列化器。
  12. createTopic() 方法用于创建 Kafka 主题。它使用 ZooKeeper 连接到 Kafka 集群,并使用 ZkUtilsAdminUtils 类来创建主题。首先,它创建一个 ZkClient 对象 zkClient,连接到 ZooKeeper 服务器。然后,使用 ZkUtils 对象 zkUtils 创建主题,并指定主题的分区和副本数。如果主题不存在,则创建主题,否则输出日志信息表示主题已经存在。

总体而言,该示例代码使用 Kafka 客户端库创建一个 Kafka 生产者,并循环发送随机选择的单词和 UUID 到 Kafka 主题。同时,它还提供了一个方法来创建 Kafka 主题。

kafka flink消费消息

代码示例

package org.davidcampos.kafka.consumer;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.davidcampos.kafka.commons.Commons;

import java.util.Properties;

public class KafkaFlinkConsumerExample {
    private static final Logger logger = LogManager.getLogger(KafkaFlinkConsumerExample.class);

    public static void main(final String... args) {
        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Properties
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Commons.EXAMPLE_KAFKA_SERVER);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "FlinkConsumerGroup");

        DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(Commons.EXAMPLE_KAFKA_TOPIC, new SimpleStringSchema(), props));


        // Split up the lines in pairs (2-tuples) containing: (word,1)
        messageStream.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                .keyBy(0)
                .sum(1)
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            logger.error("An error occurred.", e);
        }
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

代码解释

这段代码是一个简单的 Kafka Flink 消费者示例。它使用 Apache Flink 框架来消费 Kafka 中的消息,并对消息进行处理和统计。

代码解释如下:

  1. 在类的开头,定义了一个名为 “KafkaFlinkConsumerExample” 的公共类,并创建了一个静态的日志记录器 “logger”,用于记录日志信息。
  2. main 方法是程序的入口点。它首先创建了一个 StreamExecutionEnvironment 对象 env,用于设置 Flink 的执行环境。
  3. 接下来,创建了一个 Properties 对象 props,用于配置 Kafka 消费者的属性。其中,BOOTSTRAP_SERVERS_CONFIG 属性指定了 Kafka 服务器的地址,GROUP_ID_CONFIG 属性指定了消费者所属的消费者组。
  4. 使用 env.addSource 方法创建了一个 Kafka 消费者数据流 messageStream,它从名为 Commons.EXAMPLE_KAFKA_TOPIC 的 Kafka 主题中消费消息。同时,使用 SimpleStringSchema 解析器对消息进行解码,并传入之前定义的 Kafka 消费者属性 props
  5. messageStream 进行处理,通过 flatMap 方法将每条消息拆分为单词,并将每个单词转换为 (word, 1) 的二元组。
  6. 使用 keyBy(0) 对二元组按照第一个元素进行分组。
  7. 使用 sum(1) 对每个分组的二元组进行求和,即对第二个元素进行累加。
  8. 最后,使用 print() 方法将计算结果打印到标准输出。
  9. 在执行处理逻辑后,通过 env.execute() 方法来启动 Flink 程序的执行。
  10. Tokenizer 是一个内部类,实现了 FlatMapFunction 接口,用于对消息进行拆分和处理。在 flatMap 方法中,它将接收到的消息字符串进行小写化和分割,并将每个单词生成一个 (word, 1) 的二元组,然后通过 out.collect 方法发射出去。

总体而言,该示例代码使用 Flink 框架消费 Kafka 消息,并对消息进行单词拆分、计数和打印输出的简单处理。

Maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.davidcampos</groupId>
    <artifactId>kafka-spark-flink-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <!-- Logging -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>

        <!-- Zookeeper -->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>

        <!--Spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.11</artifactId>
            <version>2.9.5</version>
        </dependency>

        <!--Flink-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.4.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <shadedClassifierName>jar-with-dependencies</shadedClassifierName>
                            <artifactSet>
                                <includes>
                                    <include>*:*</include>
                                </includes>
                            </artifactSet>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <manifestEntries>
                                        <Main-Class>org.davidcampos.kafka.cli.Main</Main-Class>
                                    </manifestEntries>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138

log4j2.xml

配置示例

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

配置解释

这段配置是一个 Log4j 2 的 XML 配置文件,用于定义日志的输出格式和级别。

代码解释如下:

  1. 第一行指定了 XML 文档的版本和编码。
  2. <Configuration> 标签是配置文件的根元素,并设置了 status 属性为 “WARN”,表示日志的级别为警告及以上级别的日志信息将被输出。
  3. <Appenders> 标签用于定义日志的输出器(Appender)。
  4. <Appenders> 标签内部,定义了一个名为 “Console” 的输出器,它将日志输出到系统的标准输出。使用 name 属性指定输出器的名称,使用 target 属性指定输出目标为 “SYSTEM_OUT”。
  5. 在输出器的定义中,使用了 <PatternLayout> 标签来定义日志的输出格式。其中,pattern 属性指定了输出格式的模式。在这个例子中,“%d{HH:mm:ss.SSS}” 表示输出时间戳,“[%t]” 表示输出线程名,“%-5level” 表示输出日志级别(左对齐,最多占5个字符),“%logger{36}” 表示输出日志记录器名称(最多显示36个字符),“%msg%n” 表示输出日志消息和换行符。
  6. <Loggers> 标签用于定义日志记录器(Logger)。
  7. <Loggers> 标签内部,定义了一个名为 “Root” 的根日志记录器,它是所有日志记录器的父级。使用 level 属性指定日志级别为 “info”,表示只输出信息级别及以上的日志信息。
  8. 在根日志记录器的定义中,使用了 <AppenderRef> 标签来引用之前定义的输出器。通过 ref 属性指定引用的输出器为 “Console”。

综上所述,该配置文件定义了一个将日志输出到系统标准输出的输出器,并设置了输出的格式和级别。根日志记录器将信息级别及以上的日志消息输出到该输出器。


links:

https://github.com/davidcampos/kafka-spark-flink-example

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/638932
推荐阅读
相关标签
  

闽ICP备14008679号