赞
踩
Flink 没有类似于spark 中foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。
pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
主函数中添加sink:
val union = high.union(low).map(_.temperature.toString)
union.addSink(new FlinkKafkaProducer011[String]("localhost:9092","test", new SimpleStringSchema()))
注:我们要输出数据到kafka,所以使用的是生产者。
pom.xml
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
定义一个redis 的mapper 类,用于定义保存到redis 时调用的命令:
// 定义一个RedisMapper
class MyRedisMapper extends RedisMapper[SensorReading]{
// 定义保存数据写入redis的命令,HSET 表名 key value
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")
}
// 将温度值指定为value
override def getValueFromData(data: SensorReading): String = data.temperature.toString
// 将id指定为key
override def getKeyFromData(data: SensorReading): String = data.id
}
注:sensor_temp是表名
在主函数中调用:
// 定义一个FlinkJedisConfigBase
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.build()
dataStream.addSink( new RedisSink[SensorReading]( conf, new MyRedisMapper ) )
pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
在主函数中调用:
// 定义HttpHosts val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("localhost", 9200)) // 自定义写入es的EsSinkFunction val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReading] { override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { // 包装一个Map作为data source val dataSource = new util.HashMap[String, String]() dataSource.put("id", t.id) dataSource.put("temperature", t.temperature.toString) dataSource.put("ts", t.timestamp.toString) // 创建index request,用于发送http请求 val indexRequest = Requests.indexRequest() .index("sensor") .`type`("readingdata") .source(dataSource) // 用indexer发送请求 requestIndexer.add(indexRequest) } } dataStream.addSink(new ElasticsearchSink .Builder[SensorReading](httpHosts, myEsSinkFunc) .build() )
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
添加MyJdbcSink
class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{ // 定义连接、预编译语句 var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456") insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)") updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?") } override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { // 先执行更新操作,查到就更新 updateStmt.setDouble(1, value.temperature) updateStmt.setString(2, value.id) updateStmt.execute() // 如果更新没有查到数据,那么就插入 if( updateStmt.getUpdateCount == 0 ){ insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temperature) insertStmt.execute()`在这里插入代码片` } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }
在main 方法中增加,把明细保存到mysql 中
dataStream.addSink(new MyJdbcSinkFunc())
//方法一
dataStream.writeAsCsv("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\out.txt")
//方法二,推荐
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("D:\\Projects\\BigData\\\\FlinkTutorial\\src\\main\\resources\\out1.txt"),
new SimpleStringEncoder[SensorReading]()
).build()
)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。