当前位置:   article > 正文

4.1.15 Flink-流处理框架-Flink流处理API之sink输出操作_streamsource.addsink()

streamsource.addsink()

目录

1.写在前面

2.输出到Kafka(source/sink)

3.输出到Redis(sink)

4.输出到Elasticsearch(sink)

5.JDBC 自定义 sink


1.写在前面

Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。所有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxxx))

官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。

2.输出到Kafka(source/sink)

pom.xml

  1. <!--
  2. https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11
  3. -->
  4. <dependency>
  5. <groupId>org.apache.flink</groupId>
  6. <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
  7. <version>1.10.1</version>
  8. </dependency

主函数中添加 sink:

dataStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092","test", new SimpleStringSchema()))

 完整代码:

kafka生产者9092--->flink---->kafka消费者

  1. package com.atguigu.apitest.sink;
  2. import com.atguigu.apitest.beans.SensorReading;
  3. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
  7. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
  8. import java.util.Properties;
  9. public class SinkTest1_Kafka {
  10. public static void main(String[] args) throws Exception{
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. env.setParallelism(1);
  13. // // 从文件读取数据
  14. // DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
  15. Properties properties = new Properties();
  16. properties.setProperty("bootstrap.servers", "localhost:9092");
  17. properties.setProperty("group.id", "consumer-group");
  18. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  19. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  20. properties.setProperty("auto.offset.reset", "latest");
  21. // 从文件读取数据
  22. DataStream<String> inputStream = env.addSource( new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
  23. // 转换成SensorReading类型
  24. DataStream<String> dataStream = inputStream.map(line -> {
  25. String[] fields = line.split(",");
  26. return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString();
  27. });
  28. dataStream.addSink( new FlinkKafkaProducer011<String>("localhost:9092", "sinktest", new SimpleStringSchema()));
  29. env.execute();
  30. }
  31. }

启动一个kafka消费者消费flink输出的数据

 

3.输出到Redis(sink)

pom.xml

  1. <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis-->
  2. <dependency>
  3. <groupId>org.apache.bahir</groupId>
  4. <artifactId>flink-connector-redis_2.11</artifactId>
  5. <version>1.0</version>
  6. </dependency>

定义一个 redis 的 mapper 类,用于定义保存到 redis 时调用的命令:

  1. public static class MyRedisMapper implements RedisMapper<SensorReading>{
  2. // 保存到 redis 的命令,存成哈希表
  3. public RedisCommandDescription getCommandDescription() {
  4. return new RedisCommandDescription(RedisCommand.HSET, "sensor_tempe");
  5. }
  6. public String getKeyFromData(SensorReading data) {
  7. return data.getId();
  8. }
  9. public String getValueFromData(SensorReading data) {
  10. return data.getTemperature().toString();
  11. }
  12. }

在主函数中调用:

  1. FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();
  2. dataStream.addSink( new RedisSink<SensorReading>(config, new MyRedisMapper()) );

完整代码:

  1. package com.atguigu.apitest.sink;
  2. import com.atguigu.apitest.beans.SensorReading;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.connectors.redis.RedisSink;
  6. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  7. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  8. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  9. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  10. public class SinkTest2_Redis {
  11. public static void main(String[] args) throws Exception {
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. env.setParallelism(1);
  14. // 从文件读取数据
  15. DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
  16. // 转换成SensorReading类型
  17. DataStream<SensorReading> dataStream = inputStream.map(line -> {
  18. String[] fields = line.split(",");
  19. return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
  20. });
  21. // 定义jedis连接配置
  22. FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
  23. .setHost("localhost")
  24. .setPort(6379)
  25. .build();
  26. dataStream.addSink( new RedisSink<>(config, new MyRedisMapper()));
  27. env.execute();
  28. }
  29. // 自定义RedisMapper
  30. public static class MyRedisMapper implements RedisMapper<SensorReading>{
  31. // 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
  32. @Override
  33. public RedisCommandDescription getCommandDescription() {
  34. return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
  35. }
  36. @Override
  37. public String getKeyFromData(SensorReading data) {
  38. return data.getId();
  39. }
  40. @Override
  41. public String getValueFromData(SensorReading data) {
  42. return data.getTemperature().toString();
  43. }
  44. }
  45. }

 

4.输出到Elasticsearch(sink)

pom.xml

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>

在主函数中调用:

  1. // es 的 httpHosts 配置
  2. ArrayList<HttpHost> httpHosts = new ArrayList<>();
  3. httpHosts.add(new HttpHost("localhost", 9200));
  4. dataStream.addSink( new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());

ElasitcsearchSinkFunction 的实现:

  1. public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading>{
  2. @Override
  3. public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer){
  4. HashMap<String, String> dataSource = new HashMap<>();
  5. dataSource.put("id", element.getId());
  6. dataSource.put("ts", element.getTimestamp().toString());
  7. dataSource.put("temp", element.getTemperature().toString());
  8. IndexRequest indexRequest = Requests.indexRequest().index("sensor").type("readingData").source(dataSource);
  9. indexer.add(indexRequest);
  10. }
  11. }

完整代码:

  1. package com.atguigu.apitest.sink;
  2. import com.atguigu.apitest.beans.SensorReading;
  3. import org.apache.flink.api.common.functions.RuntimeContext;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
  7. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  8. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  9. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
  10. import org.apache.http.HttpHost;
  11. import org.elasticsearch.action.index.IndexRequest;
  12. import org.elasticsearch.client.Requests;
  13. import java.util.ArrayList;
  14. import java.util.HashMap;
  15. public class SinkTest3_Es {
  16. public static void main(String[] args) throws Exception {
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.setParallelism(1);
  19. // 从文件读取数据
  20. DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
  21. // 转换成SensorReading类型
  22. DataStream<SensorReading> dataStream = inputStream.map(line -> {
  23. String[] fields = line.split(",");
  24. return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
  25. });
  26. // 定义es的连接配置
  27. ArrayList<HttpHost> httpHosts = new ArrayList<>();
  28. httpHosts.add(new HttpHost("localhost", 9200));
  29. dataStream.addSink(new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
  30. env.execute();
  31. }
  32. // 实现自定义的ES写入操作
  33. public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading>{
  34. @Override
  35. public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
  36. // 定义写入的数据source
  37. HashMap<String, String> dataSource = new HashMap<>();
  38. dataSource.put("id", element.getId());
  39. dataSource.put("temp", element.getTemperature().toString());
  40. dataSource.put("ts", element.getTimestamp().toString());
  41. // 创建请求,作为向es发起的写入命令
  42. IndexRequest indexRequest = Requests.indexRequest()
  43. .index("sensor")
  44. .type("readingdata")
  45. .source(dataSource);
  46. // 用index发送请求
  47. indexer.add(indexRequest);
  48. }
  49. }
  50. }

5.JDBC 自定义 sink(写入到mysql)

  1. <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
  2. <dependency>
  3. <groupId>mysql</groupId>
  4. <artifactId>mysql-connector-java</artifactId>
  5. <version>5.1.44</version>
  6. </dependency>

添加 MyJdbcSink

  1. package com.atguigu.apitest.sink;
  2. import com.atguigu.apitest.beans.SensorReading;
  3. import com.atguigu.apitest.source.SourceTest4_UDF;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  9. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  10. import java.sql.Connection;
  11. import java.sql.DriverManager;
  12. import java.sql.PreparedStatement;
  13. public class SinkTest4_Jdbc {
  14. public static void main(String[] args) throws Exception {
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.setParallelism(1);
  17. // 从文件读取数据
  18. // DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
  19. //
  20. // // 转换成SensorReading类型
  21. // DataStream<SensorReading> dataStream = inputStream.map(line -> {
  22. // String[] fields = line.split(",");
  23. // return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
  24. // });
  25. DataStream<SensorReading> dataStream = env.addSource(new SourceTest4_UDF.MySensorSource());
  26. dataStream.addSink(new MyJdbcSink());
  27. env.execute();
  28. }
  29. // 实现自定义的SinkFunction
  30. public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
  31. // 声明连接和预编译语句
  32. Connection connection = null;
  33. PreparedStatement insertStmt = null;
  34. PreparedStatement updateStmt = null;
  35. @Override
  36. public void open(Configuration parameters) throws Exception {
  37. connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");
  38. insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
  39. updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
  40. }
  41. // 每来一条数据,调用连接,执行sql
  42. @Override
  43. public void invoke(SensorReading value, Context context) throws Exception {
  44. // 直接执行更新语句,如果没有更新那么就插入
  45. updateStmt.setDouble(1, value.getTemperature());
  46. updateStmt.setString(2, value.getId());
  47. updateStmt.execute();
  48. if( updateStmt.getUpdateCount() == 0 ){
  49. insertStmt.setString(1, value.getId());
  50. insertStmt.setDouble(2, value.getTemperature());
  51. insertStmt.execute();
  52. }
  53. }
  54. @Override
  55. public void close() throws Exception {
  56. insertStmt.close();
  57. updateStmt.close();
  58. connection.close();
  59. }
  60. }
  61. }

在 main 方法中增加,把明细保存到 mysql 中,而且写入的都是最新的实时数据。

dataStream.addSink(new MyJdbcSink())

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/615093
推荐阅读
相关标签
  

闽ICP备14008679号