当前位置:   article > 正文

Flink学习之旅----Flink整合Kafka实现单词的统计_kafka和flink实现单词统计

kafka和flink实现单词统计

前言

因为工作的需求,我在几天里面学习了Flink和Kafka,今天写出来Flink整合Kafka实现单词的统计

Kafka简单命令

对于kafka的简介,我这里就不多说了,大家自己百度一下,应该都清楚的,我这里就介绍一些Kafka的简单命令,因为项目会用到

安装kafka

因为我是mac电脑,在安装Kafka的时候,比较方便 brew install Kafka 一键安装 Kafka,zookeeper,解压和环境配置

环境启动

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

生产者

1.进入kafka的相关目录下面
cd /usr/local/Cellar/kafka/2.2.0/
2.创建一个topic
 ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kafka-first-topic
3.查看topic
./bin/kafka-topics --list --zookeeper localhost:2181
4.启动生产者
./bin/kafka-console-producer --broker-list localhost:9092 --topic kafka-first-topic
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

生产者

1.进入kafka的相关目录下面
cd /usr/local/Cellar/kafka/2.2.0/
2.启动消费者
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka-first-topic --from-beginning
  • 1
  • 2
  • 3
  • 4

开始实战

前期准备

首先创建一个Kafka的topic flink-windows

Flink链接Kafka

导入相关依赖

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

Flink具体代码

import java.util.Properties;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class FlinkCostKafka {


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);


        DataStream<Tuple2<String, Integer>> counts = null;
		
		//创建链接
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("zookeeper.connect", "127.0.0.1:2181");
        properties.setProperty("group.id", "flink-windows");

        FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("flink-windows", new SimpleStringSchema(),
                properties);

        DataStream<String> stream = env.addSource(myConsumer);

        counts= stream.flatMap(new LineSplitter()).keyBy(0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(2))).sum(1);

        counts.print().setParallelism(1);

        env.execute("FlinkCostKafka");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

数据写入Kafka

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerTest implements Runnable {

    private final KafkaProducer<String, String> producer;
    private final String topic;
    public KafkaProducerTest(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producer = new KafkaProducer<String, String>(props);
        this.topic = topicName;
    }

    @Override
    public void run() {
        int messageNo = 1;
        try {
            for(;;) {
                String messageStr="hello,this is "+messageNo+" messages";
                producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
                //生产了100条就打印
                if(messageNo%100==0){
                    System.out.println("发送的信息:" + messageStr);
                }
                //生产1000条就退出
                if(messageNo%1000==0){
                    System.out.println("成功发送了"+messageNo+"条");
                    break;
                }
                messageNo++;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }

    public static void main(String args[]) {
        KafkaProducerTest test = new KafkaProducerTest("flink-windows");
        Thread thread = new Thread(test);
        thread.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

完整的项目代码已经传到我的github,供大家学习,编写不易,希望大家给一个star

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/467627
推荐阅读
相关标签
  

闽ICP备14008679号