赞
踩
package org.davidcampos.kafka.producer;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.davidcampos.kafka.commons.Commons;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
public class KafkaProducerExample {
private static final Logger logger = LogManager.getLogger(KafkaProducerExample.class);
public static void main(final String... args) {
// Create topic
createTopic();
String[] words = new String[]{"one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"};
Random ran = new Random(System.currentTimeMillis());
final Producer<String, String> producer = createProducer();
int EXAMPLE_PRODUCER_INTERVAL = System.getenv("EXAMPLE_PRODUCER_INTERVAL") != null ?
Integer.parseInt(System.getenv("EXAMPLE_PRODUCER_INTERVAL")) : 100;
try {
while (true) {
String word = words[ran.nextInt(words.length)];
String uuid = UUID.randomUUID().toString();
ProducerRecord<String, String> record = new ProducerRecord<>(Commons.EXAMPLE_KAFKA_TOPIC, uuid, word);
RecordMetadata metadata = producer.send(record).get();
logger.info("Sent ({}, {}) to topic {} @ {}.", uuid, word, Commons.EXAMPLE_KAFKA_TOPIC, metadata.timestamp());
Thread.sleep(EXAMPLE_PRODUCER_INTERVAL);
}
} catch (InterruptedException | ExecutionException e) {
logger.error("An error occurred.", e);
} finally {
producer.flush();
producer.close();
}
}
private static Producer<String, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Commons.EXAMPLE_KAFKA_SERVER);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaProducerExample");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
private static void createTopic() {
int sessionTimeoutMs = 10 * 1000;
int connectionTimeoutMs = 8 * 1000;
ZkClient zkClient = new ZkClient(
Commons.EXAMPLE_ZOOKEEPER_SERVER,
sessionTimeoutMs,
connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
boolean isSecureKafkaCluster = false;
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(Commons.EXAMPLE_ZOOKEEPER_SERVER), isSecureKafkaCluster);
int partitions = 1;
int replication = 1;
// Add topic configuration here
Properties topicConfig = new Properties();
if (!AdminUtils.topicExists(zkUtils, Commons.EXAMPLE_KAFKA_TOPIC)) {
AdminUtils.createTopic(zkUtils, Commons.EXAMPLE_KAFKA_TOPIC, partitions, replication, topicConfig, RackAwareMode.Safe$.MODULE$);
logger.info("Topic {} created.", Commons.EXAMPLE_KAFKA_TOPIC);
} else {
logger.info("Topic {} already exists.", Commons.EXAMPLE_KAFKA_TOPIC);
}
zkClient.close();
}
}
这段代码是一个简单的 Kafka 生产者示例。它使用 Apache Kafka 客户端库来将消息发送到 Kafka 主题。
代码解释如下:
main
方法是程序的入口点。首先调用 createTopic()
方法创建 Kafka 主题。words
,以及一个随机数生成器 ran
。createProducer()
方法创建一个 Kafka 生产者对象 producer
。EXAMPLE_PRODUCER_INTERVAL
,它表示消息发送的时间间隔。它从环境变量 EXAMPLE_PRODUCER_INTERVAL
中获取,如果未定义,则默认为 100 毫秒。ProducerRecord
对象 record
。然后通过 producer.send(record)
方法将消息发送到 Kafka 主题。producer.send(record).get()
获取发送消息的元数据 metadata
,并将元数据信息记录到日志中。Thread.sleep(EXAMPLE_PRODUCER_INTERVAL)
方法控制消息发送的时间间隔。InterruptedException
和 ExecutionException
异常来处理,并记录错误信息到日志中。finally
块中,调用 producer.flush()
方法确保所有未发送的消息被刷新到 Kafka,然后关闭生产者。createProducer()
方法用于创建 Kafka 生产者对象。它设置了一些 Kafka 生产者的属性,例如 BOOTSTRAP_SERVERS_CONFIG
表示 Kafka 服务器地址,CLIENT_ID_CONFIG
表示客户端ID,KEY_SERIALIZER_CLASS_CONFIG
和 VALUE_SERIALIZER_CLASS_CONFIG
表示键和值的序列化器。createTopic()
方法用于创建 Kafka 主题。它使用 ZooKeeper 连接到 Kafka 集群,并使用 ZkUtils
和 AdminUtils
类来创建主题。首先,它创建一个 ZkClient
对象 zkClient
,连接到 ZooKeeper 服务器。然后,使用 ZkUtils
对象 zkUtils
创建主题,并指定主题的分区和副本数。如果主题不存在,则创建主题,否则输出日志信息表示主题已经存在。总体而言,该示例代码使用 Kafka 客户端库创建一个 Kafka 生产者,并循环发送随机选择的单词和 UUID 到 Kafka 主题。同时,它还提供了一个方法来创建 Kafka 主题。
package org.davidcampos.kafka.consumer;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.davidcampos.kafka.commons.Commons;
import java.util.Properties;
public class KafkaFlinkConsumerExample {
private static final Logger logger = LogManager.getLogger(KafkaFlinkConsumerExample.class);
public static void main(final String... args) {
// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Properties
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Commons.EXAMPLE_KAFKA_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "FlinkConsumerGroup");
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(Commons.EXAMPLE_KAFKA_TOPIC, new SimpleStringSchema(), props));
// Split up the lines in pairs (2-tuples) containing: (word,1)
messageStream.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.sum(1)
.print();
try {
env.execute();
} catch (Exception e) {
logger.error("An error occurred.", e);
}
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
这段代码是一个简单的 Kafka Flink 消费者示例。它使用 Apache Flink 框架来消费 Kafka 中的消息,并对消息进行处理和统计。
代码解释如下:
main
方法是程序的入口点。它首先创建了一个 StreamExecutionEnvironment
对象 env
,用于设置 Flink 的执行环境。Properties
对象 props
,用于配置 Kafka 消费者的属性。其中,BOOTSTRAP_SERVERS_CONFIG
属性指定了 Kafka 服务器的地址,GROUP_ID_CONFIG
属性指定了消费者所属的消费者组。env.addSource
方法创建了一个 Kafka 消费者数据流 messageStream
,它从名为 Commons.EXAMPLE_KAFKA_TOPIC
的 Kafka 主题中消费消息。同时,使用 SimpleStringSchema
解析器对消息进行解码,并传入之前定义的 Kafka 消费者属性 props
。messageStream
进行处理,通过 flatMap
方法将每条消息拆分为单词,并将每个单词转换为 (word, 1)
的二元组。keyBy(0)
对二元组按照第一个元素进行分组。sum(1)
对每个分组的二元组进行求和,即对第二个元素进行累加。print()
方法将计算结果打印到标准输出。env.execute()
方法来启动 Flink 程序的执行。Tokenizer
是一个内部类,实现了 FlatMapFunction
接口,用于对消息进行拆分和处理。在 flatMap
方法中,它将接收到的消息字符串进行小写化和分割,并将每个单词生成一个 (word, 1)
的二元组,然后通过 out.collect
方法发射出去。总体而言,该示例代码使用 Flink 框架消费 Kafka 消息,并对消息进行单词拆分、计数和打印输出的简单处理。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.davidcampos</groupId>
<artifactId>kafka-spark-flink-example</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<!-- Logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<!-- Zookeeper -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<!--Spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.9.5</version>
</dependency>
<!--Flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.4.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>jar-with-dependencies</shadedClassifierName>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>org.davidcampos.kafka.cli.Main</Main-Class>
</manifestEntries>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
这段配置是一个 Log4j 2 的 XML 配置文件,用于定义日志的输出格式和级别。
代码解释如下:
<Configuration>
标签是配置文件的根元素,并设置了 status
属性为 “WARN”,表示日志的级别为警告及以上级别的日志信息将被输出。<Appenders>
标签用于定义日志的输出器(Appender)。<Appenders>
标签内部,定义了一个名为 “Console” 的输出器,它将日志输出到系统的标准输出。使用 name
属性指定输出器的名称,使用 target
属性指定输出目标为 “SYSTEM_OUT”。<PatternLayout>
标签来定义日志的输出格式。其中,pattern
属性指定了输出格式的模式。在这个例子中,“%d{HH:mm:ss.SSS}” 表示输出时间戳,“[%t]” 表示输出线程名,“%-5level” 表示输出日志级别(左对齐,最多占5个字符),“%logger{36}” 表示输出日志记录器名称(最多显示36个字符),“%msg%n” 表示输出日志消息和换行符。<Loggers>
标签用于定义日志记录器(Logger)。<Loggers>
标签内部,定义了一个名为 “Root” 的根日志记录器,它是所有日志记录器的父级。使用 level
属性指定日志级别为 “info”,表示只输出信息级别及以上的日志信息。<AppenderRef>
标签来引用之前定义的输出器。通过 ref
属性指定引用的输出器为 “Console”。综上所述,该配置文件定义了一个将日志输出到系统标准输出的输出器,并设置了输出的格式和级别。根日志记录器将信息级别及以上的日志消息输出到该输出器。
links:
https://github.com/davidcampos/kafka-spark-flink-example
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。