赞
踩
- <dependency>
- <groupId>org.apache.bahir</groupId>
- <artifactId>flink-connector-redis_2.11</artifactId>
- <version>1.0</version>
- </dependency>
- object RedisSinkTest {
-
-
-
- def main(args: Array[String]): Unit = {
-
-
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-
-
- val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")
-
-
-
- val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
-
- val arr = d.split(",")
-
- SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
-
- })
-
-
-
- //redis sink
-
- val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").setPort(6379).build()
-
- dataStream.addSink(new RedisSink(config,new MyRedisMapper))
-
-
-
- env.execute("redis sink test")
-
- }
-
-
-
- }
-
-
-
- class MyRedisMapper extends RedisMapper[SensorReading]{
-
-
-
- //命令为hset,键为sensor_temperature
-
- override def getCommandDescription: RedisCommandDescription = {
-
- new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
-
- }
-
-
-
- //field为传感器id
-
- override def getKeyFromData(t: SensorReading): String = t.id
-
-
-
- //value为温度
-
- override def getValueFromData(t: SensorReading): String = t.temperature.toString
-
- }
|
|
redis查看结果
1 2 3 4 5 6 7 8 9 |
|
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
- <version>1.7.2</version>
- </dependency>
- object EsSinkTest {
-
-
-
- def main(args: Array[String]): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-
-
- val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")
-
-
-
- val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
-
- val arr = d.split(",")
-
- SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
-
- })
-
-
-
- //es sink
-
- val httpHosts = new util.ArrayList[HttpHost]()
-
- httpHosts.add(new HttpHost("hadoop101",9200))
-
- //创建一个es sink的builder
-
- val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading] {
-
- override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
-
- println("保存数据:" + t)
-
- //包装成map
-
- val map = new util.HashMap[String, String]()
-
- map.put("sensor_id", t.id)
-
- map.put("temperature", t.temperature.toString)
-
- map.put("ts", t.timestamp.toString)
-
-
-
- //创建index request,准备发送数据
-
- val indexRequest: IndexRequest = Requests.indexRequest().index("sensor").`type`("redingdata").source(map)
-
-
-
- //利用requestIndexer发送请求,写入数据
-
- requestIndexer.add(indexRequest)
-
-
-
- println("保存成功")
-
- }
-
- })
-
- esSinkBuilder
-
-
-
- dataStream.addSink(esSinkBuilder.build())
-
-
-
- env.execute("redis sink test")
-
- }
-
-
-
- }
|
|
{ "took" : 148, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 6, "max_score" : 1.0, "hits" : [ { "_index" : "sensor", "_type" : "redingdata", "_id" : "QXpZnnEBUwLRQchmepbT", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_6", "temperature" : "15.402984393403084", "ts" : "1547718201" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "RnpZnnEBUwLRQchme5ZS", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_7", "temperature" : "6.720945201171228", "ts" : "1547718202" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "Q3pZnnEBUwLRQchmepbr", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_1", "temperature" : "35.80018327300259", "ts" : "1547718199" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "QnpZnnEBUwLRQchmepbo", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_1", "temperature" : "30.8", "ts" : "1547718200" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "RHpZnnEBUwLRQchmepbs", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_1", "temperature" : "40.8", "ts" : "1547718201" } }, { "_index" : "sensor", "_type" : "redingdata", "_id" : "RXpZnnEBUwLRQchmepbu", "_score" : 1.0, "_source" : { "sensor_id" : "sensor_10", "temperature" : "38.101067604893444", "ts" : "1547718205" } } ] } }
- <!-- mysql sink -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.44</version>
- </dependency>
②自定义mysql sink,继承RichSinkFunction,重写执行逻辑以及初始化和关闭资源的方法。
- class MyJdbcSink() extends RichSinkFunction[SensorReading]{
-
-
-
- //定义sql连接、预编译器
-
- var conn:Connection = _
-
- var insertStmt : PreparedStatement = _
-
- var updateStmt:PreparedStatement=_
-
-
-
- //初始化
-
- override def open(parameters: Configuration): Unit = {
-
- super.open(parameters)
-
-
-
- conn = DriverManager.getConnection("jdbc:mysql:///test","root","123456")
-
-
-
- insertStmt = conn.prepareStatement("insert into temperatures(sensor,temp) values(?,?)")
-
-
-
- updateStmt = conn.prepareStatement("update temperatures set temp=? where sensor=?")
-
- }
-
-
-
- //调用连接,执行sql
-
- override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
-
- //这句必须删掉
-
- //super.invoke(value, context)
-
-
-
- //执行更新语句
-
- updateStmt.setDouble(1,value.temperature)
-
- updateStmt.setString(2,value.id)
-
- updateStmt.execute()
-
-
-
- //如果没有,则插入
-
- if (updateStmt.getUpdateCount == 0){
-
- insertStmt.setString(1,value.id)
-
- insertStmt.setDouble(2,value.temperature)
-
- insertStmt.execute()
-
- }
-
-
-
- }
-
-
-
- //关闭资源
-
- override def close(): Unit = {
-
- updateStmt.close()
-
- insertStmt.close()
-
- conn.close()
-
- }
-
-
-
- }
③添加自定义的mysql sink并执行
- object JdbcSinkTest {
-
-
-
- def main(args: Array[String]): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-
-
- val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")
-
-
-
- val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
-
- val arr = d.split(",")
-
- SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
-
- })
-
-
-
- //jdbc sink
-
- dataStream.addSink(new MyJdbcSink())
-
-
-
- env.execute("jdbc sink test")
-
- }
-
-
-
- }
|
|
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。