赞
踩
Flink没有类似于Spark中foreach方法,让用户进行迭代的操作
虽有对外的输出操作都要利用Sink完成
最后通过类似如下方式完成整个任务最终输出操作
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的Sink;除此以外,需要用户自定义实现Sink
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
package com.streamapi import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011} object KafkaSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // val stream = env.readTextFile("source/sensor.txt") val properties = new Properties() properties.setProperty("bootstrap.servers", "node7-1:9092") properties.setProperty("group.id", "flink-test") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val stream = env.addSource(new FlinkKafkaConsumer011[String]( "test1", new SimpleStringSchema(), properties )) val value = stream.map(line => { // sensor_2,1600828094726,113.45370583283331 val splited = line.split(",") SensorReading(splited(0), splited(1).trim.toLong, splited(2).trim.toDouble) }) val result = value.map(_.temperature.toString) val rse = result.addSink(new FlinkKafkaProducer011[String]( "node7-1:9092", "test", new SimpleStringSchema() )) env.execute("KafkaSink") } } // 定义样例类,传感器id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double)
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
package co.streamapi import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} object RedisSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 读入数据 val stream = env.readTextFile("source/sensor.txt") // Transform操作 val value = stream.map(data => { val dataArray = data.split(",") SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ) }) val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build() // sink value.addSink(new RedisSink(conf, new MyRedisMapper())) value.print("RedisSink") env.execute("RedisSink") } } class MyRedisMapper() extends RedisMapper[SensorReading] { // 定义保存数据到redis的命令 override def getCommandDescription: RedisCommandDescription = { // 把传感器id和温度值保存成哈希表 HSET key field value new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature") } // 定义保存到redis的key override def getKeyFromData(t: SensorReading): String = t.id // 定义保存到redis的value override def getValueFromData(t: SensorReading): String = t.temperature.toString } // 定义样例类,传感器id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.7.2</version>
</dependency>
package com.streamapi import java.util import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.client.Requests object ElasticsearchSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 读入数据 val stream = env.readTextFile("source/sensor.txt") // Transform操作 val value = stream.map(data => { val dataArray = data.split(",") SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ) }) val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("localhost", 9200)) // 创建一个EsSink的builder val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading] { override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { println("saving data: " + t) // 包装成一个Map或者JSONObject val json = new util.HashMap[String, String]() json.put("sensor_id", t.id) json.put("timestamp", t.timestamp.toString) json.putIfAbsent("temperature", t.temperature.toString) // 创建index request,准备发送数据 val indexRequest = Requests.indexRequest().index("sensor").`type`("readingdata").source(json) // 利用index发送数据,写入数据 requestIndexer.add(indexRequest) println("data saved...") } } ) // sink value.addSink(esSinkBuilder.build()) env.execute("ElasticsearchSink") } } // 定义样例类,传感器id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double)
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
</dependency>
package com.streamapi import java.sql.{Connection, DriverManager, PreparedStatement} import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 object JDBCSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("source/sensor.txt") // val properties = new Properties() // properties.setProperty("bootstrap.servers", "node7-1:9092") // properties.setProperty("group.id", "flink-test") // properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // properties.setProperty("auto.offset.reset", "latest") // // val stream = env.addSource(new FlinkKafkaConsumer011[String]( // "test", // new SimpleStringSchema(), // properties // )) val value = stream.map(line => { // sensor_2,1600828094726,113.45370583283331 val splited = line.split(",") SensorReading(splited(0), splited(1).trim.toLong, splited(2).trim.toDouble) }) value.addSink(new MyJdbcSink()) env.execute("JDBCSink") } } /** * JDBC自定义Sink * 添加MyJdbcSink */ class MyJdbcSink() extends RichSinkFunction[SensorReading] { // 预编译的操作 var conn: Connection = _ var insertStream: PreparedStatement = _ var updateStreeam: PreparedStatement = _ // open:主要是创建连接 override def open(parameters: Configuration): Unit = { super.open(parameters) // Class.forName("com.mysql.cj.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/mybatis?serverTimezone=UTC", "root", "123456") insertStream = conn.prepareStatement("insert into sensor_reading(sensor, temperature) values (?, ?)") updateStreeam = conn.prepareStatement("update sensor_reading set temperature = ? where sensor = ?") } // 调用连接,执行sql override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { // 如果SensorReading id存在,更新温度;如果不存在,做数据插入 updateStreeam.setDouble(1, value.temperature) updateStreeam.setString(2, value.id) updateStreeam.execute() // 没有更新成功,做数据插入操作 if (updateStreeam.getUpdateCount == 0) { insertStream.setString(1, value.id) insertStream.setDouble(2, value.temperature) insertStream.execute() } } // 关闭,做清理操作 override def close(): Unit = { insertStream.close() updateStreeam.close() conn.close() } } // 定义样例类,传感器id,时间戳,温度 case class SensorReading(id: String, timestamp: Long, temperature: Double)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。