当前位置:   article > 正文

flink的三种sink(redis,es,mysql)_flink mysqldeseri

flink mysqldeseri

flink的三种sink(redis,es,mysql)

一、redis sink

对应jar包

  1. <dependency>
  2. <groupId>org.apache.bahir</groupId>
  3. <artifactId>flink-connector-redis_2.11</artifactId>
  4. <version>1.0</version>
  5. </dependency>

将文件内容写入到hash中

代码:

  1. object RedisSinkTest {
  2.   def main(args: Array[String]): Unit = {
  3.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  4.     val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")
  5.     val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
  6.       val arr = d.split(",")
  7.       SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
  8.     })
  9.     //redis sink
  10.     val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("hadoop102").setPort(6379).build()
  11.     dataStream.addSink(new RedisSink(config,new MyRedisMapper))
  12.     env.execute("redis sink test")
  13.   }
  14. }
  15. class MyRedisMapper extends RedisMapper[SensorReading]{
  16.   //命令为hset,键为sensor_temperature
  17.   override def getCommandDescriptionRedisCommandDescription = {
  18.     new RedisCommandDescription(RedisCommand.HSET,"sensor_temperature")
  19.   }
  20.   //field为传感器id
  21.   override def getKeyFromData(t: SensorReading): String = t.id
  22.   //value为温度
  23.   override def getValueFromData(t: SensorReading): String = t.temperature.toString
  24. }

 

 

redis查看结果

1

2

3

4

5

6

7

8

9

127.0.0.1:6379> hgetall sensor_temperature

1) "sensor_1"

2) "35.80018327300259"

3) "sensor_6"

4) "15.402984393403084"

5) "sensor_10"

6) "38.101067604893444"

7) "sensor_7"

8) "6.720945201171228"

  

二、es sink

对应jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
  4. <version>1.7.2</version>
  5. </dependency>

将文件内容写入到es中

代码:

  1. object EsSinkTest {
  2.   def main(args: Array[String]): Unit = {
  3.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  4.     val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")
  5.     val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
  6.       val arr = d.split(",")
  7.       SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
  8.     })
  9.     //es sink
  10.     val httpHosts = new util.ArrayList[HttpHost]()
  11.     httpHosts.add(new HttpHost("hadoop101",9200))
  12.     //创建一个es sink的builder
  13.     val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](httpHosts, new ElasticsearchSinkFunction[SensorReading] {
  14.       override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
  15.         println("保存数据:" + t)
  16.         //包装成map
  17.         val map = new util.HashMap[String, String]()
  18.         map.put("sensor_id", t.id)
  19.         map.put("temperature", t.temperature.toString)
  20.         map.put("ts", t.timestamp.toString)
  21.         //创建index request,准备发送数据
  22.         val indexRequest: IndexRequest = Requests.indexRequest().index("sensor").`type`("redingdata").source(map)
  23.         //利用requestIndexer发送请求,写入数据
  24.         requestIndexer.add(indexRequest)
  25.         println("保存成功")
  26.       }
  27.     })
  28.     esSinkBuilder
  29.     dataStream.addSink(esSinkBuilder.build())
  30.     env.execute("redis sink test")
  31.   }
  32. }

 

 

es中查看结果

复制代码

  1. {
  2. "took" : 148,
  3. "timed_out" : false,
  4. "_shards" : {
  5. "total" : 5,
  6. "successful" : 5,
  7. "skipped" : 0,
  8. "failed" : 0
  9. },
  10. "hits" : {
  11. "total" : 6,
  12. "max_score" : 1.0,
  13. "hits" : [
  14. {
  15. "_index" : "sensor",
  16. "_type" : "redingdata",
  17. "_id" : "QXpZnnEBUwLRQchmepbT",
  18. "_score" : 1.0,
  19. "_source" : {
  20. "sensor_id" : "sensor_6",
  21. "temperature" : "15.402984393403084",
  22. "ts" : "1547718201"
  23. }
  24. },
  25. {
  26. "_index" : "sensor",
  27. "_type" : "redingdata",
  28. "_id" : "RnpZnnEBUwLRQchme5ZS",
  29. "_score" : 1.0,
  30. "_source" : {
  31. "sensor_id" : "sensor_7",
  32. "temperature" : "6.720945201171228",
  33. "ts" : "1547718202"
  34. }
  35. },
  36. {
  37. "_index" : "sensor",
  38. "_type" : "redingdata",
  39. "_id" : "Q3pZnnEBUwLRQchmepbr",
  40. "_score" : 1.0,
  41. "_source" : {
  42. "sensor_id" : "sensor_1",
  43. "temperature" : "35.80018327300259",
  44. "ts" : "1547718199"
  45. }
  46. },
  47. {
  48. "_index" : "sensor",
  49. "_type" : "redingdata",
  50. "_id" : "QnpZnnEBUwLRQchmepbo",
  51. "_score" : 1.0,
  52. "_source" : {
  53. "sensor_id" : "sensor_1",
  54. "temperature" : "30.8",
  55. "ts" : "1547718200"
  56. }
  57. },
  58. {
  59. "_index" : "sensor",
  60. "_type" : "redingdata",
  61. "_id" : "RHpZnnEBUwLRQchmepbs",
  62. "_score" : 1.0,
  63. "_source" : {
  64. "sensor_id" : "sensor_1",
  65. "temperature" : "40.8",
  66. "ts" : "1547718201"
  67. }
  68. },
  69. {
  70. "_index" : "sensor",
  71. "_type" : "redingdata",
  72. "_id" : "RXpZnnEBUwLRQchmepbu",
  73. "_score" : 1.0,
  74. "_source" : {
  75. "sensor_id" : "sensor_10",
  76. "temperature" : "38.101067604893444",
  77. "ts" : "1547718205"
  78. }
  79. }
  80. ]
  81. }
  82. }

复制代码

 三、jdbc sink

①mysql驱动

  1. <!-- mysql sink -->
  2. <dependency>
  3. <groupId>mysql</groupId>
  4. <artifactId>mysql-connector-java</artifactId>
  5. <version>5.1.44</version>
  6. </dependency>

②自定义mysql sink,继承RichSinkFunction,重写执行逻辑以及初始化和关闭资源的方法。

  1. class MyJdbcSink() extends RichSinkFunction[SensorReading]{
  2.   //定义sql连接、预编译器
  3.   var conn:Connection = _
  4.   var insertStmt : PreparedStatement = _
  5.   var updateStmt:PreparedStatement=_
  6.   //初始化
  7.   override def open(parameters: Configuration): Unit = {
  8.     super.open(parameters)
  9.     conn = DriverManager.getConnection("jdbc:mysql:///test","root","123456")
  10.     insertStmt = conn.prepareStatement("insert into temperatures(sensor,temp) values(?,?)")
  11.     updateStmt = conn.prepareStatement("update temperatures set temp=? where sensor=?")
  12.   }
  13.   //调用连接,执行sql
  14.   override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
  15.     //这句必须删掉
  16.     //super.invoke(value, context)
  17.     //执行更新语句
  18.     updateStmt.setDouble(1,value.temperature)
  19.     updateStmt.setString(2,value.id)
  20.     updateStmt.execute()
  21.     //如果没有,则插入
  22.     if (updateStmt.getUpdateCount == 0){
  23.       insertStmt.setString(1,value.id)
  24.       insertStmt.setDouble(2,value.temperature)
  25.       insertStmt.execute()
  26.     }
  27.   }
  28.   //关闭资源
  29.   override def close(): Unit = {
  30.     updateStmt.close()
  31.     insertStmt.close()
  32.     conn.close()
  33.   }
  34. }

③添加自定义的mysql sink并执行

  1. object JdbcSinkTest {
  2.   def main(args: Array[String]): Unit = {
  3.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  4.     val streamFromFile = env.readTextFile("C:\\Users\\Mi\\Documents\\project\\idea\\FlinkTitorial\\src\\main\\resources\\sensor.txt")
  5.     val dataStream: DataStream[SensorReading] = streamFromFile.map(d => {
  6.       val arr = d.split(",")
  7.       SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).toDouble)
  8.     })
  9.     //jdbc sink
  10.     dataStream.addSink(new MyJdbcSink())
  11.     env.execute("jdbc sink test")
  12.   }
  13. }

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/606489
推荐阅读
相关标签
  

闽ICP备14008679号