赞
踩
Function
map ==> MapFunction
filter ==> FilterFunction
xxx ==> XxxFunction
RichXxxFunction *****
这次我们们来看看Flink的Source和Sink,Flink支持向文件、socket、集合等中读写数据,同时Flink也内置许多connectors,例如Kafka、Hadoop、Redis等。这些内置的connectors都是维护精确一次语义,而SparkStreaming对接Kafka、Redis等需要使用者自己维护精确一次语义。接下来,我们来看看Flink如何自定义Source,本文不涉及到checkpoint等实现。
Flink流程不管多复杂,都是上述这个流程
package com.ruozedata.flink.flink02 import com.ruozedata.flink.fink01.Domain.Access import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ object SourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //测试使用 env.fromCollection(List( Access(201912120010L, "ruozedata.com", 2000), Access(201912120011L, "ruozedata.com", 3000) ) ).print() env.execute(this.getClass.getSimpleName) } } 结果: 4> Access(201912120010,ruozedata.com,2000) 1> Access(201912120011,ruozedata.com,3000)
/**
* Creates a DataStream that contains the given elements. The elements must all be of the
* same type.
*
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
*/
def fromElements[T: TypeInformation](data: T*): DataStream[T] = {
fromCollection(data)
}
注意:前面的泛型 ,如果加上,会类型限定的
//比如加了[Int]
package com.ruozedata.flink.flink02 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ object SourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //测试使用 env.fromElements(1, 2L, 3D, 4F, "5").print() env.execute(this.getClass.getSimpleName) } } 结果: 4> 3.0 1> 4.0 3> 2 2> 1 2> 5
小知识点:
json性能不一定好,因为使用json格式的话,每一行都必须包含字段的名称,这些对于大量的在网络传输上面是不行的,开销很大,生产上都是使用纯文本的,但是每个字段名称,字段分隔符等都是要预先定义好的
官网Flink提供以下三类接口来实现你所想要的Source:
/** * 当Source运行时就会调用run方法,结束时调用cancel */ class MyNonParallelSource extends SourceFunction[Access]{ private var isRunning = true override def run(ctx: SourceFunction.SourceContext[Access]): Unit = { val domains = List("ruozedata.com", "dongqiudi.com", "zhibo8.com") val random = new Random() while (isRunning) { val time = System.currentTimeMillis() + "" val domain = domains(random.nextInt(domains.length)) val flow = random.nextInt(10000) 1.to(10).map(x => ctx.collect(Access(time, domain, flow))) } } override def cancel(): Unit = { isRunning = false } } //自定义生成数据,并行度不能被设置超过1 val ds = env.addSource(new MyNonParallelSource)
这个很重要用于测试产生数据
package com.ruozedata.flink.flink02 import com.ruozedata.flink.fink01.Domain.Access import org.apache.flink.streaming.api.functions.source.SourceFunction import scala.util.Random class AccessSource extends SourceFunction[Access] { override def run(sourceContext: SourceFunction.SourceContext[Access]): Unit = { val random = new Random() val domains = Array("ruozedata.com", "zhibo8.cc", "dongqiudi.com") while (running) { val timestamp = System.currentTimeMillis() 1.to(10).map(x => { sourceContext.collect(Access(timestamp, domains(random.nextInt(domains.length)), random.nextInt(1000) + x)) }) // 休息下 每个5s Thread.sleep(5000) } } //开关 var running = true override def cancel(): Unit = { running = false } }
package com.ruozedata.flink.flink02 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ object SourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //测试使用 env.addSource(new AccessSource).print() // env.fromElements(1, 2L, 3D, 4F, "5").print() env.execute(this.getClass.getSimpleName) } } 结果是: 4> Access(1588567173212,dongqiudi.com,284) 4> Access(1588567173212,ruozedata.com,901) 2> Access(1588567173212,dongqiudi.com,861) 2> Access(1588567173212,dongqiudi.com,346) 2> Access(1588567173212,ruozedata.com,236) 3> Access(1588567173212,zhibo8.cc,678) 1> Access(1588567173212,dongqiudi.com,621) 1> Access(1588567173212,ruozedata.com,95) 3> Access(1588567173212,zhibo8.cc,375) 3> Access(1588567173212,dongqiudi.com,927) 1> Access(1588567178270,ruozedata.com,630) .........
测试:SourceFunction 是没有并行度可言的
package com.ruozedata.flink.flink02 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ object SourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //测试使用 env.addSource(new AccessSource).setParallelism(3).print() env.execute(this.getClass.getSimpleName) } } 结果: Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55) at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:31) at org.apache.flink.streaming.api.scala.DataStream.setParallelism(DataStream.scala:130) at com.ruozedata.flink.flink02.SourceApp$.main(SourceApp.scala:12) at com.ruozedata.flink.flink02.SourceApp.main(SourceApp.scala)
查看源码:debug查看
所以在大数据场景下,没有并行度,这个就不能用
class MyParallelSource extends ParallelSourceFunction[Access] { private var isRunning = true override def run(ctx: SourceFunction.SourceContext[Access]): Unit = { val domains = List("ruozedata.com", "dongqiudi.com", "zhibo8.com") val random = new Random() while (isRunning) { val time = System.currentTimeMillis() + "" val domain = domains(random.nextInt(domains.length)) val flow = random.nextInt(10000) 1.to(10).map(x => ctx.collect(Access(time, domain, flow))) } } override def cancel(): Unit = { isRunning = false } } //自定义生成数据,并行度能被设置大于1 val ds2 = env.addSource(new MyParallelSource)
package com.ruozedata.flink.flink02 import com.ruozedata.flink.fink01.Domain.Access import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} import scala.util.Random class AccessSource02 extends ParallelSourceFunction[Access] { var running = true override def cancel(): Unit = { running = false } override def run(ctx: SourceFunction.SourceContext[Access]): Unit = { val random = new Random() val domains = Array("ruozedata.com", "zhibo8.cc", "dongqiudi.com") while (running) { val timestamp = System.currentTimeMillis() 1.to(10).map(x => { ctx.collect(Access(timestamp, domains(random.nextInt(domains.length)), random.nextInt(1000) + x)) }) // 休息下 Thread.sleep(5000) } } } //注意:代码是一样的 就换个继承
package com.ruozedata.flink.flink02 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ object SourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //测试使用 env.addSource(new AccessSource02).setParallelism(3).print() env.execute(this.getClass.getSimpleName) } } 结果是: 2> Access(1588584321030,dongqiudi.com,279) 1> Access(1588584321031,zhibo8.cc,57) 2> Access(1588584321030,ruozedata.com,377) 3> Access(1588584321030,ruozedata.com,334) 3> Access(1588584321030,dongqiudi.com,995) 3> Access(1588584321031,ruozedata.com,314) 3> Access(1588584321031,dongqiudi.com,385) 3> Access(1588584321031,ruozedata.com,363) 3> Access(1588584321031,ruozedata.com,872) 4> Access(1588584321030,dongqiudi.com,885) 4> Access(1588584321030,dongqiudi.com,130) 4> Access(1588584321030,ruozedata.com,95) 3> Access(1588584321031,dongqiudi.com,454) 2> Access(1588584321031,dongqiudi.com,141) 1> Access(1588584321031,ruozedata.com,525) 1> Access(1588584321030,dongqiudi.com,693) 注意: 这就是每个5s发送10条数据 但是并行度是3的情况下 就是 每个5s 发送 10*3 = 30 条数据
/** * MySQL作为FLink的Source */ class MySQLSource extends RichParallelSourceFunction[City]{ private var conn:Connection = _ private var state:PreparedStatement = _ override def open(parameters: Configuration): Unit = { val url = "jdbc:mysql://localhost:3306/g7" val user = "ruoze" val password = "ruozedata" conn = MySQLUtil.getConnection(url,user,password) } override def close(): Unit = { MySQLUtil.close(conn,state) } override def run(ctx: SourceFunction.SourceContext[City]): Unit = { val sql = "select * from city_info" state = conn.prepareStatement(sql) val rs = state.executeQuery() while(rs.next()){ val id = rs.getInt(1) val name = rs.getString(2) val area = rs.getString(3) ctx.collect(City(id,name,area)) } } override def cancel(): Unit = {} } //自定义从MySQL中取出数据,支持并行度大于1 val ds3 = env.addSource(new MySQLSource)
注意: 和上面的核心代码一样 但是 这个Rich里面 /** * Base class for implementing a parallel data source. Upon execution, the runtime will * execute as many parallel instances of this function as configured parallelism * of the source. * * <p>The data source has access to context information (such as the number of parallel * instances of the source, and which parallel instance the current instance is) * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p> * * @param <OUT> The type of the records produced by this source. */ @Public public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> { private static final long serialVersionUID = 1L; } 1. extends AbstractRichFunction 而AbstractRichFunction 是有生命周期的方法 open() close() 所以: 如果做一些 1.文件系统 2.初始化 3.io 4.mysql 等等 就得在open里和close里面获取连接 和关闭连接 因为: open() close() 一个task会执行一次 (前面文章有例子)task就是并行度 所以要使用这个Function 生命周期是可以控制的
package com.ruozedata.flink.flink02 import com.ruozedata.flink.fink01.Domain.Access import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction} import scala.util.Random class AccessSource03 extends RichParallelSourceFunction[Access] { var running = true override def cancel(): Unit = { running = false } override def open(parameters: Configuration): Unit = { super.open(parameters) } override def close(): Unit = { super.close() } override def run(ctx: SourceFunction.SourceContext[Access]): Unit = { val random = new Random() val domains = Array("ruozedata.com","zhibo8.cc","dongqiudi.com") while(running) { val timestamp = System.currentTimeMillis() 1.to(10).map(x => { ctx.collect(Access(timestamp, domains(random.nextInt(domains.length)), random.nextInt(1000)+x)) }) // 休息下 Thread.sleep(5000) } } }
上面就是 Source的最基本的使用
那么现在有一个需求:连接MySQL ,查看官网 没有MySQL的Connector,那么只能去github上找 或者自己写一个Source即可;需求就是:把MySQL数据读取进来 ,操作MySQL pom里要加入MySQL依赖的
mysql> select * from mysql.student;
+------+--------+----------+------+
| id | name | password | age |
+------+--------+----------+------+
| 1 | kairis | leo123 | 18 |
| 2 | tonny | 222 | 19 |
| 3 | hello | 333 | 20 |
+------+--------+----------+------+
3 rows in set (0.00 sec)
package com.ruozedata.flink.flink02 import java.sql.{Connection, PreparedStatement} import com.ruozedata.flink.fink01.Domain.Student import com.ruozedata.flink.utils.MySQLUtils import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} /** * 读取MySQL中的数据 */ class MySQLSource extends RichSourceFunction[Student] { var connection: Connection = _ var pstmt: PreparedStatement = _ // 在open方法中建立连接 override def open(parameters: Configuration): Unit = { super.open(parameters) connection = MySQLUtils.getConnection() pstmt = connection.prepareStatement("select * from student") } // 释放 override def close(): Unit = { super.close() MySQLUtils.closeResource(connection, pstmt) } override def cancel(): Unit = { } override def run(ctx: SourceFunction.SourceContext[Student]): Unit = { val rs = pstmt.executeQuery() while (rs.next()) { val student = Student(rs.getInt("id"), rs.getString("name"), rs.getString("password"), rs.getInt("age")) ctx.collect(student) } } }
package com.ruozedata.flink.flink02 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.api.scala._ object SourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //测试使用 // env.addSource(new AccessSource03).setParallelism(3).print() env.addSource(new MySQLSource ).print() env.execute(this.getClass.getSimpleName) } } 结果: 4> Student(3,hello,333,20) 2> Student(1,kairis,leo123,18) 3> Student(2,tonny,222,19)
当然上面这种实现方式太low了,还有更好的实现方式
注意:
连接MySQL 生产上是有并行度的
你不能在run里面建立连接 为什么呢?什么会触发run呢?主要的逻辑会触发run
这就类似Spark里面的 foreach 和foreachPartiion
得借助于 open方法:
去拿到连接 这个是每个task会执行一次
env.addSource(new MySQLSource).setParallelism(3).print()
会报错的:
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
为什么呢?
但是这种方式 太low了
使用scalikjdbc: 这种方式更优雅些
在配置文件里可以配置连接池
class ScalikeJDBCMySQLSource extends RichSourceFunction[Student] { override def run(ctx: SourceFunction.SourceContext[Student]): Unit = { println("~~~run~~~~") DBs.setupAll() // parse configuration file DB.readOnly { implicit session => { SQL("select * from student").map(rs => { val student = Student(rs.int("id"), rs.string("name"), rs.string("password"), rs.int("age")) ctx.collect(student) }).list().apply() } } } override def cancel(): Unit = { } } 运行结果: ~~~run~~~~ 4> Student(1,kairis,leo123,18) 1> Student(2,tonny,222,19) 2> Student(3,hello,333,20)
注意:不同版本的kafka Flink 使用的api不同
但是在 0.11之后就统一了
所以我使用的 pom是:
我的flink 是 1.9.0的
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
Flink’s Kafka consumer is
called FlinkKafkaConsumer08 (or 09 for
Kafka 0.9.0.x versions, etc.
or
just FlinkKafkaConsumer for Kafka >= 1.0.0 versions).
It provides access to one or more Kafka topics.
Source:
Flink作为Kafka的消费者
Example:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
stream = env
.addSource(new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties))
.print()
FlinkKafkaConsumer010源码
/** /** * Creates a new Kafka streaming source consumer for Kafka 0.10.x. * * @param topic * The name of the topic that should be consumed. * @param valueDeserializer * The de-/serializer used to convert between Kafka's byte messages and Flink's objects. * @param props * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { this(Collections.singletonList(topic), valueDeserializer, props); } //构造器三个参数 1.topic 2.DeserializationSchema 接收数据 反序列化器 ,Kafka生产数据的时候是要序列化的,消费数据时是要反序列化的 3.Properties
先开启: package com.ruozedata.flink.flink02 import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 object SourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //测试使用 // env.addSource(new ScalikeJDBCMySQLSource).print() val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop001:9092") //有几个节点就配几个,不一定要配全 // only required for Kafka 0.8 // properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "ruozedata-flink-test") val consumer = new FlinkKafkaConsumer010[String]("ruozedata_offset", new SimpleStringSchema(), properties) env.addSource(consumer).print() env.execute(this.getClass.getSimpleName) } } 结果: 4> c 4> f 4> d 4> e 4> a 4> d
往kafka发送数据: object DataGenerator { private val logger: Logger = LoggerFactory.getLogger(DataGenerator.getClass) def main(args: Array[String]): Unit = { val props = new Properties() props.put("bootstrap.servers", "hadoop001:9092") props.put("acks", "all") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) for (i <- 0 to 5) { Thread.sleep(100) //拿一个abcdef val word: String = String.valueOf((new Random().nextInt(6) + 'a').toChar) val part = i % 3 //发到哪个分区 因为是三个分区 logger.error("word : {}", word) val record = producer.send(new ProducerRecord[String, String]("ruozedata_offset", "", word)) } producer.close() println("数据产生完毕..........") } }
Flink里面的kafka的offset 非常 好管理的
Spark里面的 是批次 每个批次的偏移量的管理
Flink是一条数据 进来的 一条数据进来的
控制偏移量就是api的使用 很简单
val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest() // start from the earliest record possible
myConsumer.setStartFromLatest() // start from the latest record
myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets() // the default behaviour
val stream = env.addSource(myConsumer)
这是官网上的
那么先讲使用,里面的状态和checkpoint 容错等 后续写入文章
package com.ruozedata.flink.flink02 import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 object SourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //测试使用 // env.addSource(new ScalikeJDBCMySQLSource).print() val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop001:9092") //有几个节点就配几个,不一定要配全 // only required for Kafka 0.8 // properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "ruozedata-flink-test") val consumer = new FlinkKafkaConsumer010[String]("ruozedata_offset", new SimpleStringSchema(), properties) //控制偏移量 consumer.setStartFromEarliest() env.addSource(consumer).print() env.execute(this.getClass.getSimpleName) } } 结果: 4> c 4> f 4> d 4> e 4> a 4> d 看 这就把我们前面写入kafka的数据读出来了 所以Flink里的偏移量很简单 底层管理的很好 问题: 我能不能指定 一个分区 分区里面的 offset呢?
上述这种场景就可以在修数据的时候用
DataSource有内置的,connector,以及自定义的
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/index.html#datastream-transformations
和Spark大部分都差不多 ,下面说一下 Spark里没有的
package com.ruozedata.flink.flink02 import com.ruozedata.flink.fink01.Domain.Access import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ object TranformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("data/access.log").map(x => { val splits = x.split(",") Access(splits(0).toLong, splits(1), splits(2).toLong) }) stream.keyBy(_.domain).sum("traffic").print("sum") env.execute(this.getClass.getSimpleName) } } 结果是: sum:3> Access(201912120010,dongqiudi.com,1000) sum:3> Access(201912120010,zhibo8.com,5000) sum:3> Access(201912120010,ruozedata.com,2000) sum:3> Access(201912120010,dongqiudi.com,7000) sum:3> Access(201912120010,ruozedata.com,6000) 原始数据: 201912120010,ruozedata.com,2000 201912120010,dongqiudi.com,6000 201912120010,zhibo8.com,5000 201912120010,ruozedata.com,4000 201912120010,dongqiudi.com,1000 为什么结果是这样子的: 因为这是进来一条统计一次 注意和Spark的区别: Spark里是一个批次的
拓展一下
package com.ruozedata.flink.flink02 import com.ruozedata.flink.fink01.Domain.Access import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ object TranformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("data/access.log").map(x => { val splits = x.split(",") Access(splits(0).toLong, splits(1), splits(2).toLong) }) stream.keyBy(_.domain).reduce((x, y) => { Access(x.time, x.domain, (x.traffic + y.traffic + 100)) }).print() env.execute(this.getClass.getSimpleName) } } 结果: 3> Access(201912120010,zhibo8.com,5000) 3> Access(201912120010,ruozedata.com,4000) 3> Access(201912120010,dongqiudi.com,1000) 3> Access(201912120010,ruozedata.com,6100) 3> Access(201912120010,dongqiudi.com,7100) 注意: 1.reduce 比sum灵活 ,reduce可以实现自己的业务逻辑,keyby之后 是把相同的domain 放到一起了 2.为什么zhibo8.com 结果没有+100 进来一条统计一次 为什么结果没有+100呢? 因为 就1个呀 ,reduce 把相邻的两个做操作 ,1个操作啥
分流(把一个流拆开):
这个功能 可能会用的到: 这里使用split 后续是使用 siteoutput输出 因为split过时了 不过没有关系 package com.ruozedata.flink.flink02 import com.ruozedata.flink.fink01.Domain.Access import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ object TranformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("data/access.log").map(x => { val splits = x.split(",") Access(splits(0).toLong, splits(1), splits(2).toLong) }) // 5000 6000 7000三个结果 val splitStream = stream.keyBy("domain").sum("traffic").split(x => { if (x.traffic > 6000) { Seq("大客户") } else { Seq("一般客户") } }) splitStream.select("大客户").print("大客户") splitStream.select("一般客户").print("一般客户") env.execute(this.getClass.getSimpleName) } } 结果是: 一般客户:3> Access(201912120010,dongqiudi.com,1000) 一般客户:3> Access(201912120010,zhibo8.com,5000) 一般客户:3> Access(201912120010,ruozedata.com,2000) 大客户:3> Access(201912120010,dongqiudi.com,7000) 一般客户:3> Access(201912120010,ruozedata.com,6000) splitStream.select("大客户", "一般客户").print("ALL") 结果是: 一般客户:3> Access(201912120010,dongqiudi.com,1000) ALL:3> Access(201912120010,dongqiudi.com,1000) 一般客户:3> Access(201912120010,ruozedata.com,4000) ALL:3> Access(201912120010,ruozedata.com,4000) 一般客户:3> Access(201912120010,zhibo8.com,5000) ALL:3> Access(201912120010,zhibo8.com,5000) 一般客户:3> Access(201912120010,ruozedata.com,6000) ALL:3> Access(201912120010,ruozedata.com,6000) 大客户:3> Access(201912120010,dongqiudi.com,7000) ALL:3> Access(201912120010,dongqiudi.com,7000)
合流
两个流是可以合的 : union connect object TranformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("data/access.log").map(x => { val splits = x.split(",") Access(splits(0).toLong, splits(1), splits(2).toLong) }) val stream1 = env.addSource(new AccessSource) val stream2 = env.addSource(new AccessSource) //stream1和stream2的数据类型是一样的 stream1.union(stream2).map(x=>{ println("接收到的数据:" + x) x }).print() env.execute(this.getClass.getSimpleName) } } 结果:不重要哈 结果不重要 接收到的数据:Access(1588660509252,ruozedata.com,786) 4> Access(1588660509252,ruozedata.com,786) 接收到的数据:Access(1588660509252,ruozedata.com,807) 4> Access(1588660509252,ruozedata.com,807) 接收到的数据:Access(1588660509252,dongqiudi.com,883) 4> Access(1588660509252,dongqiudi.com,883) 接收到的数据:Access(1588660509255,zhibo8.cc,455) 4> Access(1588660509255,zhibo8.cc,455) 接收到的数据:Access(1588660509255,ruozedata.com,691) 4> Access(1588660509255,ruozedata.com,691) 接收到的数据:Access(1588660509252,ruozedata.com,59)
/** * Creates a new DataStream by merging DataStream outputs of * the same type with each other. The DataStreams merged using this operator * will be transformed simultaneously. * */ def union(dataStreams: DataStream[T]*): DataStream[T] = asScalaStream(stream.union(dataStreams.map(_.javaStream): _*)) union: he same type with each other. 要求数据类型是一样的 两个流 但是生产上 要合并两个流 很少 两个流数据类型是一样的 /** * Creates a new ConnectedStreams by connecting * DataStream outputs of different type with each other. The * DataStreams connected using this operators can be used with CoFunctions. */ def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] = asScalaStream(stream.connect(dataStream.javaStream)) 两个流 数据类型不一样的: 使用 connect
两个流 数据类型不一样的:
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R):
注意:
fun1: IN1 => R, fun2: IN2 => R
fun1: IN1 => R 第一个流 做的操作
fun2: IN2 => R 第二个流 做的操作
object TranformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("data/access.log").map(x => { val splits = x.split(",") Access(splits(0).toLong, splits(1), splits(2).toLong) }) val stream1 = env.addSource(new AccessSource) val stream2 = env.addSource(new AccessSource) val stream2New = stream2.map(x => ("J哥", x)) stream1.connect(stream2New).map(x=>x,y=>y).print() env.execute(this.getClass.getSimpleName) } } 结果是: 3> Access(1588660731150,ruozedata.com,308) 3> Access(1588660731150,zhibo8.cc,868) 3> Access(1588660731150,dongqiudi.com,994) 4> Access(1588660731150,ruozedata.com,721) 4> Access(1588660731150,ruozedata.com,640) 4> Access(1588660731150,zhibo8.cc,381) 3> (J哥,Access(1588660731154,dongqiudi.com,639)) 3> (J哥,Access(1588660731154,zhibo8.cc,282)) 3> (J哥,Access(1588660731154,ruozedata.com,645)) 2> Access(1588660731150,dongqiudi.com,379) 2> Access(1588660731150,ruozedata.com,406) 2> (J哥,Access(1588660731154,zhibo8.cc,249)) 2> (J哥,Access(1588660731154,dongqiudi.com,973)) 2> (J哥,Access(1588660731154,zhibo8.cc,97)) 4> (J哥,Access(1588660731154,ruozedata.com,342)) 4> (J哥,Access(1588660731154,ruozedata.com,72)) 1> Access(1588660731150,dongqiudi.com,248) 1> Access(1588660731150,ruozedata.com,55) 1> (J哥,Access(1588660731154,ruozedata.com,185)) 1> (J哥,Access(1588660731154,dongqiudi.com,174)) 2> Access(1588660736197,dongqiudi.com,213) 2> Access(1588660736197,ruozedata.com,517) 2> Access(1588660736197,dongqiudi.com,374) 3> Access(1588660736197,zhibo8.cc,156) 1> Access(1588660736197,dongqiudi.com,62) 1> Access(1588660736197,zhibo8.cc,642) 4> Access(1588660736197,zhibo8.cc,552) 1> Access(1588660736197,ruozedata.com,604) 3> Access(1588660736197,zhibo8.cc,318)
connect:
Connects two data streams retaining their types
数据结构可以不同
two data streams **
union:
Union of two or more data streams
数据结构要相同
two or more data streams
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/#physical-partitioning
class RuozedataPartitioner extends Partitioner[String] { override def partition(key: String, numPartitions: Int): Int = { println("partitions: " + numPartitions) // 注意 scala 里面 不用使用 equals 直接 == 即可 if(key == "ruozedata.com"){ 0 } else if(key == "dongqiudi.com"){ 1 } else { 2 } } } 注意: Partitioner[String] 传进去的泛型 是 key的类型 分区的前提 是 kv
/**
* Partitions a tuple DataStream on the specified key fields using a custom partitioner.
* This method takes the key position to partition on, and a partitioner that accepts the key
* type.
*
* Note: This method works only on single field keys.
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] =
asScalaStream(stream.partitionCustom(partitioner, field))
field: Int 注意这个
package com.ruozedata.flink.flink02 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.api.scala._ object TranformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(3) env.addSource(new AccessSource) //这里分区肯定是要kv的格式 .map(x=>(x.domain, x)) .partitionCustom(new RuozedataPartitioner, 0) .map(x => { println("current thread id is: " + Thread.currentThread().getId + " , value is: " + x) x._2 }).print() env.execute(this.getClass.getSimpleName) } } 结果是: partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 current thread id is: 60 , value is: (ruozedata.com,Access(1588661525735,ruozedata.com,985)) 1> Access(1588661525735,ruozedata.com,985) current thread id is: 60 , value is: (ruozedata.com,Access(1588661525735,ruozedata.com,199)) 1> Access(1588661525735,ruozedata.com,199) current thread id is: 63 , value is: (zhibo8.cc,Access(1588661525735,zhibo8.cc,826)) 3> Access(1588661525735,zhibo8.cc,826) current thread id is: 63 , value is: (zhibo8.cc,Access(1588661525735,zhibo8.cc,421)) 3> Access(1588661525735,zhibo8.cc,421) current thread id is: 60 , value is: (ruozedata.com,Access(1588661525735,ruozedata.com,900)) 1> Access(1588661525735,ruozedata.com,900) current thread id is: 60 , value is: (ruozedata.com,Access(1588661525735,ruozedata.com,699)) 1> Access(1588661525735,ruozedata.com,699) current thread id is: 62 , value is: (dongqiudi.com,Access(1588661525735,dongqiudi.com,840)) 2> Access(1588661525735,dongqiudi.com,840) current thread id is: 62 , value is: (dongqiudi.com,Access(1588661525735,dongqiudi.com,115)) 2> Access(1588661525735,dongqiudi.com,115) current thread id is: 62 , value is: (dongqiudi.com,Access(1588661525735,dongqiudi.com,8)) 2> Access(1588661525735,dongqiudi.com,8) current thread id is: 63 , value is: (zhibo8.cc,Access(1588661525735,zhibo8.cc,109)) 3> Access(1588661525735,zhibo8.cc,109) partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 partitions: 3 current thread id is: 60 , value is: (ruozedata.com,Access(1588661530778,ruozedata.com,475)) current thread id is: 63 , value is: (zhibo8.cc,Access(1588661530778,zhibo8.cc,189)) current thread id is: 62 , value is: (dongqiudi.com,Access(1588661530778,dongqiudi.com,408)) 3> Access(1588661530778,zhibo8.cc,189) 2> Access(1588661530778,dongqiudi.com,408) current thread id is: 63 , value is: (zhibo8.cc,Access(1588661530778,zhibo8.cc,663)) 1> Access(1588661530778,ruozedata.com,475) 3> Access(1588661530778,zhibo8.cc,663) current thread id is: 63 , value is: (zhibo8.cc,Access(1588661530778,zhibo8.cc,766)) current thread id is: 60 , value is: (ruozedata.com,Access(1588661530778,ruozedata.com,799)) 1> Access(1588661530778,ruozedata.com,799) current thread id is: 62 , value is: (dongqiudi.com,Access(1588661530778,dongqiudi.com,126)) 2> Access(1588661530778,dongqiudi.com,126) 3> Access(1588661530778,zhibo8.cc,766) current thread id is: 60 , value is: (ruozedata.com,Access(1588661530778,ruozedata.com,110)) current thread id is: 63 , value is: (zhibo8.cc,Access(1588661530778,zhibo8.cc,361)) 1> Access(1588661530778,ruozedata.com,110) 3> Access(1588661530778,zhibo8.cc,361) current thread id is: 60 , value is: (ruozedata.com,Access(1588661530778,ruozedata.com,783)) 1> Access(1588661530778,ruozedata.com,783)
注意:线程
1-> ruozedata.com
2->dongqiudi.com
3>zhibo8.cc
结果是没有问题的
这就是分区器的简单使用
让每个线程处理不同的数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。