赞
踩
flink 和之前的Spark一样,整个技术无非分为三个过程:数据的读取,数据的计算,计算完数据的输出
flink 中的数据的来源可以通过 StreamExecutionEnvironment.addSource(sourceFunction)添加数据源
sourceFunction可以使用flink中自带的,用户也可以自定义。
自定义的时候可以实现SourceFunction接口,也可以通过实现ParallelSourceFunction或者继承RichParallelSourceFunction
可读取本地或者hdfs中的文件、kafka中生产者产生的数据、或者使用linux中的nc产生数据当数据源等等
本地文件:
val dataStream:DataStream[String] = environment.readTextFile(“d:/data/a.txt”)
hdfs:
val text = environment.readTextFile(“hdfs:hadoop10:8020/flink/flink-words”)
hdfs具体实现:
1.添加Hadoop依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.10.0</version>
</dependency>
2.读取hdfs中的文件
//1.创建执行环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment
//2.获取数据源
val text = environment.readTextFile("hdfs://flink.baizhiedu.com:8020/flink/flink-words")
//3.对获取到的数据进行转换
val result = text.flatMap(line => line.split("\\s+"))
.map(word => (word, 1))
.keyBy(0)
.sum(1)
//4.打印结果
result.print()
//5.执行job
environment.execute("myFlinkJob")
val text = environment.socketTextStream(“hadoop10”,9999)
大致步骤:1.添加kafka的依赖,2.设置kafka相关信息。3.通过对应的API完成数据读取。4.提供反序列化支持
前置环境:启动kafka,由于依赖于zookeeper,就先启动zookeeper在启动kafka
1.启动zookeeper
bin/zkServer.sh start
2.启动kafka
bin/kafka-server-start.sh config/server.properties
3.创建主题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flinktest
4.查看主题
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
5.发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flinktest
6.消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flinktest--from-beginning
1.引入maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
2.编写代码
SimpleStringSchema只会反序列化value
object QuickStart { def main(args: Array[String]): Unit = { //1.创建执行环境 val environment = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop10:9092") var text = environment .addSource(new FlinkKafkaConsumer[String]("flinktest", new SimpleStringSchema(), properties)); //3.对获取到的数据进行转换 val result = text.flatMap(line => line.split("\\s+")) .map(word => (word, 1)) .keyBy(0) .sum(1) //4.打印结果 result.print() //5.执行job environment.execute("myFlinkJob") } }
KafkaDeserializationSchema
通过实现这个接口,可以反序列化key、value、partition、offset等
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer object QuickStart { def main(args: Array[String]): Unit = { //1.创建执行环境 val environment = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop10:9092") var text = environment .addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]("flinktest", new MyKafkaDeserializationSchema(), properties)); //3.对获取到的数据进行转换 val result = text.flatMap(line =>line._2.split("\\s+")) .map(word => (word, 1)) .keyBy(0) .sum(1) //4.打印结果 result.print() //5.执行job environment.execute("myFlinkJob") } }
//编写自定义 import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.flink.api.scala._ /** * 泛型分别是key/value/partition/offset的类型 */ class MyKafkaDeserializationSchema extends KafkaDeserializationSchema[(String,String,Int,Long)]{ override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false; override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = { if(consumerRecord.key()!=null){ (new String(consumerRecord.key()),new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset()) }else{ (null,new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset()) } } override def getProducedType: TypeInformation[(String, String, Int, Long)] = { createTypeInformation[(String, String, Int, Long)]; } }
JSONKeyValueDeserializationSchema
这个是flink-kafka提供的类,可以直接使用,在使用的时候要求kafka中topic的key、value都必须是json。也可以在使用的过程中,指定是否读取元数据(topic、partition、offset等)
import java.util.Properties import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema import org.apache.flink.api.scala._ object JSONKeyValueDeserializationSchema { def main(args: Array[String]): Unit = { //1.创建执行环境 val environment = StreamExecutionEnvironment.getExecutionEnvironment //设置并行度,通过打印执行计划查看并行度是否起作用 var properties: Properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop10:9092") //2.获取数据源 val text = environment .addSource(new FlinkKafkaConsumer[ObjectNode]("flinktest",new JSONKeyValueDeserializationSchema(false),properties)); //先查看一下内容整体 //text.map(t=>t.toString).print() text.map(t=>(t.get("value").get("id").asInt(),t.get("value").get("name").asText())).print() //5.执行job environment.execute("myFlinkJob") } } //传入数据类型:{"id":101,"name":"xiaohei"}
还是使用算子计算:
//输入数据为a b c
// val value: DataStream[String] = dataStream.flatMap(_.split("\\s+"))
// value.print() //打印出来就是a b c三个元素
val dataStream2: DataStream[Array[String]] = dataStream.map(_.split("\\s+"))//
dataStream2.print()//打印出来就是一个数组地址
val value: DataStream[String] = dataStream2.map(e => e(0) + "***" + e(1))//从数组中获取对应位置的元素,然后拼接成字符串
value.print()//a***b
keyedStream的理解
datasink:数据输出
flink支持的输出方式有:打印、文件(HDFS)、redis、kafka…
例:输出到hdfs
1.首先 要导入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.10.0</version>
</dependency>
2.编写代码
import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner import org.apache.flink.streaming.api.scala._ object FileDataSinkFlinkConnectorFileSystem { def main(args: Array[String]): Unit = { //1.创建执行环境 val environment = StreamExecutionEnvironment.getExecutionEnvironment //2.获取数据源 val text = environment.socketTextStream("hadoop10",9999) var streamFileSink = StreamingFileSink.forRowFormat(new Path("hdfs://hadoop10/flink-result"), new SimpleStringEncoder[(String,Int)]()) .withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd")) .build(); //3.对获取到的数据进行转换 val result = text.flatMap(line => line.split("\\s+")) .map(word => (word, 1)) .keyBy(0) .sum(1) //4.把数据写入到文件系统 result.addSink(streamFileSink) //5.执行job environment.execute("myFlinkJob") } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。