赞
踩
Flink中没有类似mapreduce、spark中的foreach方法让用户进行迭代的操作,所以所有对外的输出操作都要利用sink来完成
通过这样的形式来完成任务的输出操作
stream.addSink(new MySink(xxxxxx));
当然 Flink 官网也集成了一些sink的框架
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import java.util.Properties; public class Sink_Kafka { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka 配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.216.111:9092,192.168.216.112:9092,192.168.216.113:9092"); properties.setProperty("group.id", "flink-kafka"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 读取Kafka topic中的数据 DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer011<String>( "sensor", new SimpleStringSchema(), properties )); // 发送到kafka生产者 String brokerlist = "192.168.216.111:9092,192.168.216.112:9092,192.168.216.113:9092"; String topic = "flink-kafka-sink"; stream.addSink(new FlinkKafkaProducer011<String>(brokerlist,topic,new SimpleStringSchema())); } }
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.12</artifactId>
<version>1.10.1</version>
</dependency>
在实际生活中的场景下我们很多时候要自定义sink操作,Flink 内置了一些基本的数据源和接收器,可以方便的写出sink操作。但是数据一致性等一些问题还需要我们考虑
下面来自定义mysql sink
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class Sink_Custom_MySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource()); inputDataStream.addSink(new CustomJdbcSink()); env.execute(); } // 自定义 jdbc SinkFunction<T> sinkFunction public static class CustomJdbcSink extends RichSinkFunction<SensorReading>{ Connection conn = null; PreparedStatement insertStmt = null; PreparedStatement updateStmt = null; // open 创建连接 @Override public void open(Configuration parameters) throws Exception { //数据库连接参数 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/flinkstudy","root","123456"); // 创建与编译器,占位符 可传参 insertStmt = conn.prepareStatement("INSERT INTO sensor (id, temp) VALUES (?, ?)"); updateStmt = conn.prepareStatement("UPDATE sensor SET temp = ? WHERE id = ?"); } // 调用连接,执行 sql @Override public void invoke(SensorReading value, Context context) throws Exception { updateStmt.setDouble(1,value.getTemperature()); updateStmt.setString(2,value.getId()); updateStmt.execute(); // 如果刚才update 语句没有更新,那么插入 if (updateStmt.getUpdateCount() == 0){ insertStmt.setString(1,value.getId()); insertStmt.setDouble(2,value.getTemperature()); insertStmt.execute(); } } @Override public void close() throws Exception{ insertStmt.close(); updateStmt.close(); conn.close(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。