赞
踩
目录
Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。所有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。
pom.xml
- <!--
- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11
- -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
- <version>1.10.1</version>
- </dependency
主函数中添加 sink:
dataStream.addSink(new FlinkKafkaProducer011[String]("localhost:9092","test", new SimpleStringSchema()))
完整代码:
kafka生产者9092--->flink---->kafka消费者
- package com.atguigu.apitest.sink;
-
- import com.atguigu.apitest.beans.SensorReading;
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
-
- import java.util.Properties;
-
- public class SinkTest1_Kafka {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // // 从文件读取数据
- // DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
-
- Properties properties = new Properties();
- properties.setProperty("bootstrap.servers", "localhost:9092");
- properties.setProperty("group.id", "consumer-group");
- properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.setProperty("auto.offset.reset", "latest");
-
- // 从文件读取数据
- DataStream<String> inputStream = env.addSource( new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
-
- // 转换成SensorReading类型
- DataStream<String> dataStream = inputStream.map(line -> {
- String[] fields = line.split(",");
- return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString();
- });
-
- dataStream.addSink( new FlinkKafkaProducer011<String>("localhost:9092", "sinktest", new SimpleStringSchema()));
-
- env.execute();
- }
- }
启动一个kafka消费者消费flink输出的数据
pom.xml
- <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis-->
- <dependency>
- <groupId>org.apache.bahir</groupId>
- <artifactId>flink-connector-redis_2.11</artifactId>
- <version>1.0</version>
- </dependency>
定义一个 redis 的 mapper 类,用于定义保存到 redis 时调用的命令:
- public static class MyRedisMapper implements RedisMapper<SensorReading>{
- // 保存到 redis 的命令,存成哈希表
- public RedisCommandDescription getCommandDescription() {
- return new RedisCommandDescription(RedisCommand.HSET, "sensor_tempe");
- }
- public String getKeyFromData(SensorReading data) {
- return data.getId();
- }
- public String getValueFromData(SensorReading data) {
- return data.getTemperature().toString();
- }
- }
在主函数中调用:
- FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();
-
- dataStream.addSink( new RedisSink<SensorReading>(config, new MyRedisMapper()) );
完整代码:
- package com.atguigu.apitest.sink;
-
- import com.atguigu.apitest.beans.SensorReading;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.redis.RedisSink;
- import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-
- public class SinkTest2_Redis {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // 从文件读取数据
- DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
-
- // 转换成SensorReading类型
- DataStream<SensorReading> dataStream = inputStream.map(line -> {
- String[] fields = line.split(",");
- return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
- });
-
- // 定义jedis连接配置
- FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
- .setHost("localhost")
- .setPort(6379)
- .build();
-
- dataStream.addSink( new RedisSink<>(config, new MyRedisMapper()));
-
- env.execute();
- }
-
- // 自定义RedisMapper
- public static class MyRedisMapper implements RedisMapper<SensorReading>{
- // 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature
- @Override
- public RedisCommandDescription getCommandDescription() {
- return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp");
- }
-
- @Override
- public String getKeyFromData(SensorReading data) {
- return data.getId();
- }
-
- @Override
- public String getValueFromData(SensorReading data) {
- return data.getTemperature().toString();
- }
- }
- }
pom.xml
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
在主函数中调用:
- // es 的 httpHosts 配置
- ArrayList<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("localhost", 9200));
- dataStream.addSink( new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
ElasitcsearchSinkFunction 的实现:
- public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading>{
- @Override
- public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer){
-
- HashMap<String, String> dataSource = new HashMap<>();
- dataSource.put("id", element.getId());
- dataSource.put("ts", element.getTimestamp().toString());
- dataSource.put("temp", element.getTemperature().toString());
- IndexRequest indexRequest = Requests.indexRequest().index("sensor").type("readingData").source(dataSource);
- indexer.add(indexRequest);
- }
- }
完整代码:
- package com.atguigu.apitest.sink;
-
- import com.atguigu.apitest.beans.SensorReading;
- import org.apache.flink.api.common.functions.RuntimeContext;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
- import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
- import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
- import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
- import org.apache.http.HttpHost;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.client.Requests;
-
- import java.util.ArrayList;
- import java.util.HashMap;
-
- public class SinkTest3_Es {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // 从文件读取数据
- DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
-
- // 转换成SensorReading类型
- DataStream<SensorReading> dataStream = inputStream.map(line -> {
- String[] fields = line.split(",");
- return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
- });
-
- // 定义es的连接配置
- ArrayList<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("localhost", 9200));
-
- dataStream.addSink(new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
-
- env.execute();
- }
-
- // 实现自定义的ES写入操作
- public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading>{
- @Override
- public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
- // 定义写入的数据source
- HashMap<String, String> dataSource = new HashMap<>();
- dataSource.put("id", element.getId());
- dataSource.put("temp", element.getTemperature().toString());
- dataSource.put("ts", element.getTimestamp().toString());
-
- // 创建请求,作为向es发起的写入命令
- IndexRequest indexRequest = Requests.indexRequest()
- .index("sensor")
- .type("readingdata")
- .source(dataSource);
-
- // 用index发送请求
- indexer.add(indexRequest);
- }
- }
- }
- <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.44</version>
- </dependency>
添加 MyJdbcSink
- package com.atguigu.apitest.sink;
- import com.atguigu.apitest.beans.SensorReading;
- import com.atguigu.apitest.source.SourceTest4_UDF;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
-
- public class SinkTest4_Jdbc {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // 从文件读取数据
- // DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
- //
- // // 转换成SensorReading类型
- // DataStream<SensorReading> dataStream = inputStream.map(line -> {
- // String[] fields = line.split(",");
- // return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
- // });
-
- DataStream<SensorReading> dataStream = env.addSource(new SourceTest4_UDF.MySensorSource());
-
- dataStream.addSink(new MyJdbcSink());
-
- env.execute();
- }
-
- // 实现自定义的SinkFunction
- public static class MyJdbcSink extends RichSinkFunction<SensorReading> {
- // 声明连接和预编译语句
- Connection connection = null;
- PreparedStatement insertStmt = null;
- PreparedStatement updateStmt = null;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");
- insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)");
- updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
- }
-
- // 每来一条数据,调用连接,执行sql
- @Override
- public void invoke(SensorReading value, Context context) throws Exception {
- // 直接执行更新语句,如果没有更新那么就插入
- updateStmt.setDouble(1, value.getTemperature());
- updateStmt.setString(2, value.getId());
- updateStmt.execute();
- if( updateStmt.getUpdateCount() == 0 ){
- insertStmt.setString(1, value.getId());
- insertStmt.setDouble(2, value.getTemperature());
- insertStmt.execute();
- }
- }
-
- @Override
- public void close() throws Exception {
- insertStmt.close();
- updateStmt.close();
- connection.close();
- }
- }
- }
在 main 方法中增加,把明细保存到 mysql 中,而且写入的都是最新的实时数据。
dataStream.addSink(new MyJdbcSink())
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。