当前位置:   article > 正文

Flink (三) 处理数据的流程_flink处理数据教程

flink处理数据教程

一、Flink处理数据的流程

flink 和之前的Spark一样,整个技术无非分为三个过程:数据的读取,数据的计算,计算完数据的输出

1、数据的读取

1.1 怎么读取

flink 中的数据的来源可以通过 StreamExecutionEnvironment.addSource(sourceFunction)添加数据源
sourceFunction可以使用flink中自带的,用户也可以自定义。
自定义的时候可以实现SourceFunction接口,也可以通过实现ParallelSourceFunction或者继承RichParallelSourceFunction

1.2 从哪里读取

可读取本地或者hdfs中的文件、kafka中生产者产生的数据、或者使用linux中的nc产生数据当数据源等等

1.2.1 File-based 本地或者hdfs中读取

本地文件:
val dataStream:DataStream[String] = environment.readTextFile(“d:/data/a.txt”)
hdfs:
val text = environment.readTextFile(“hdfs:hadoop10:8020/flink/flink-words”)

hdfs具体实现:
1.添加Hadoop依赖

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>2.10.0</version>
 </dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-scala_2.11</artifactId>
   <version>1.10.0</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2.读取hdfs中的文件

 //1.创建执行环境
  val environment = StreamExecutionEnvironment.getExecutionEnvironment
 //2.获取数据源
 val text = environment.readTextFile("hdfs://flink.baizhiedu.com:8020/flink/flink-words")
 //3.对获取到的数据进行转换
 val result = text.flatMap(line => line.split("\\s+"))
   .map(word => (word, 1))
   .keyBy(0)
   .sum(1)
 //4.打印结果
 result.print()
 //5.执行job
 environment.execute("myFlinkJob")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
1.2.2 Socket-based 使用linux中的nc产生数据当数据源

val text = environment.socketTextStream(“hadoop10”,9999)

1.2.3 以kafka作为数据源

大致步骤:1.添加kafka的依赖,2.设置kafka相关信息。3.通过对应的API完成数据读取。4.提供反序列化支持
前置环境:启动kafka,由于依赖于zookeeper,就先启动zookeeper在启动kafka

1.启动zookeeper
 bin/zkServer.sh start
2.启动kafka
 bin/kafka-server-start.sh config/server.properties
3.创建主题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flinktest
4.查看主题
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
5.发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flinktest
6.消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flinktest--from-beginning
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

1.引入maven依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.编写代码
SimpleStringSchema只会反序列化value

object QuickStart {
  def main(args: Array[String]): Unit = {
    //1.创建执行环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop10:9092")
    var text = environment
      .addSource(new FlinkKafkaConsumer[String]("flinktest", new SimpleStringSchema(), properties));
    //3.对获取到的数据进行转换
    val result = text.flatMap(line => line.split("\\s+"))
      .map(word => (word, 1))
      .keyBy(0)
      .sum(1)
    //4.打印结果
    result.print()
    //5.执行job
    environment.execute("myFlinkJob")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

KafkaDeserializationSchema
通过实现这个接口,可以反序列化key、value、partition、offset等

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object QuickStart {
  def main(args: Array[String]): Unit = {
    //1.创建执行环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop10:9092")
    var text = environment
      .addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]("flinktest", new MyKafkaDeserializationSchema(), properties));
    //3.对获取到的数据进行转换
    val result = text.flatMap(line =>line._2.split("\\s+"))
      .map(word => (word, 1))
      .keyBy(0)
      .sum(1)
    //4.打印结果
    result.print()
    //5.执行job
    environment.execute("myFlinkJob")
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
//编写自定义

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.api.scala._
/**
  * 泛型分别是key/value/partition/offset的类型
  */
class MyKafkaDeserializationSchema extends KafkaDeserializationSchema[(String,String,Int,Long)]{
  override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false;
  override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
    if(consumerRecord.key()!=null){
      (new String(consumerRecord.key()),new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
    }else{
      (null,new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
    }
  }
  override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
    createTypeInformation[(String, String, Int, Long)];
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

JSONKeyValueDeserializationSchema
这个是flink-kafka提供的类,可以直接使用,在使用的时候要求kafka中topic的key、value都必须是json。也可以在使用的过程中,指定是否读取元数据(topic、partition、offset等)

import java.util.Properties
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
import org.apache.flink.api.scala._

object JSONKeyValueDeserializationSchema {
  def main(args: Array[String]): Unit = {
    //1.创建执行环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    //设置并行度,通过打印执行计划查看并行度是否起作用
    var properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop10:9092")
    //2.获取数据源
    val text = environment
        .addSource(new FlinkKafkaConsumer[ObjectNode]("flinktest",new JSONKeyValueDeserializationSchema(false),properties));
    //先查看一下内容整体
    //text.map(t=>t.toString).print()
    text.map(t=>(t.get("value").get("id").asInt(),t.get("value").get("name").asText())).print()
    //5.执行job
    environment.execute("myFlinkJob")
  }
}

//传入数据类型:{"id":101,"name":"xiaohei"}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2、数据的计算

还是使用算子计算:
在这里插入图片描述

 //输入数据为a b  c
//    val value: DataStream[String] = dataStream.flatMap(_.split("\\s+"))
//    value.print() //打印出来就是a b c三个元素
    val dataStream2: DataStream[Array[String]] = dataStream.map(_.split("\\s+"))//
    dataStream2.print()//打印出来就是一个数组地址
    val value: DataStream[String] = dataStream2.map(e => e(0) + "***" + e(1))//从数组中获取对应位置的元素,然后拼接成字符串
    value.print()//a***b
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

keyedStream的理解

3、计算结果的输出

datasink:数据输出
flink支持的输出方式有:打印、文件(HDFS)、redis、kafka
例:输出到hdfs
1.首先 要导入依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-filesystem_2.11</artifactId>
    <version>1.10.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.编写代码

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.scala._

object FileDataSinkFlinkConnectorFileSystem {
  def main(args: Array[String]): Unit = {
    //1.创建执行环境
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.获取数据源
    val text = environment.socketTextStream("hadoop10",9999)

    var streamFileSink = StreamingFileSink.forRowFormat(new Path("hdfs://hadoop10/flink-result"),
      new SimpleStringEncoder[(String,Int)]())
      .withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd"))
      .build();
    //3.对获取到的数据进行转换
    val result = text.flatMap(line => line.split("\\s+"))
      .map(word => (word, 1))
      .keyBy(0)
      .sum(1)
    //4.把数据写入到文件系统
    result.addSink(streamFileSink)
    //5.执行job
    environment.execute("myFlinkJob")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/864000
推荐阅读
相关标签
  

闽ICP备14008679号