赞
踩
因为工作的需求,我在几天里面学习了Flink和Kafka,今天写出来Flink整合Kafka实现单词的统计
对于kafka的简介,我这里就不多说了,大家自己百度一下,应该都清楚的,我这里就介绍一些Kafka的简单命令,因为项目会用到
因为我是mac电脑,在安装Kafka的时候,比较方便 brew install Kafka 一键安装 Kafka,zookeeper,解压和环境配置
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
1.进入kafka的相关目录下面
cd /usr/local/Cellar/kafka/2.2.0/
2.创建一个topic
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafka-first-topic
3.查看topic
./bin/kafka-topics --list --zookeeper localhost:2181
4.启动生产者
./bin/kafka-console-producer --broker-list localhost:9092 --topic kafka-first-topic
1.进入kafka的相关目录下面
cd /usr/local/Cellar/kafka/2.2.0/
2.启动消费者
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka-first-topic --from-beginning
首先创建一个Kafka的topic flink-windows
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
import java.util.Properties; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; public class FlinkCostKafka { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); DataStream<Tuple2<String, Integer>> counts = null; //创建链接 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); properties.setProperty("zookeeper.connect", "127.0.0.1:2181"); properties.setProperty("group.id", "flink-windows"); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("flink-windows", new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(myConsumer); counts= stream.flatMap(new LineSplitter()).keyBy(0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(2))).sum(1); counts.print().setParallelism(1); env.execute("FlinkCostKafka"); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } }
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerTest implements Runnable { private final KafkaProducer<String, String> producer; private final String topic; public KafkaProducerTest(String topicName) { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); this.producer = new KafkaProducer<String, String>(props); this.topic = topicName; } @Override public void run() { int messageNo = 1; try { for(;;) { String messageStr="hello,this is "+messageNo+" messages"; producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr)); //生产了100条就打印 if(messageNo%100==0){ System.out.println("发送的信息:" + messageStr); } //生产1000条就退出 if(messageNo%1000==0){ System.out.println("成功发送了"+messageNo+"条"); break; } messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { KafkaProducerTest test = new KafkaProducerTest("flink-windows"); Thread thread = new Thread(test); thread.start(); } }
完整的项目代码已经传到我的github,供大家学习,编写不易,希望大家给一个star
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。