当前位置:   article > 正文

Flink从入门到真香(10、Sink数据输出-Elasticsearch)

flink sink 没有数据

目标: 从txt文件中读取数据,写入es,我这里用的es7.9,如果用的es7之前的版本下面代码中有个.type("_doc") 类别需要设置

如果没有es和kibana(可选)环境可以先安装

安装es7

  1. wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm
  2. wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.3-x86_64.rpm.sha512
  3. shasum -a 512 -c elasticsearch-7.9.3-x86_64.rpm.sha512
  4. sudo rpm --install elasticsearch-7.9.3-x86_64.rpm
  5. systemctl restart elasticsearch

安装kibana (可选,如果不想界面操作就可以不用装)

  1. wget https://artifacts.elastic.co/downloads/kibana/kibana-7.9.3-x86_64.rpm
  2. sudo rpm --install kibana-7.9.3-x86_64.rpm
  3. systemctl start kibana

先引入Elasticsearch的pom依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>

新建一个ElasticsearchSinkTest.scala

  1. package com.mafei.sinktest
  2. import java.util
  3. import org.apache.flink.api.common.functions.RuntimeContext
  4. import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
  5. import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
  6. import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
  7. import org.apache.http.HttpHost
  8. import org.elasticsearch.client.Requests
  9. object ElasticsearchSinkTest {
  10. def main(args: Array[String]): Unit = {
  11. //创建执行环境
  12. val env = StreamExecutionEnvironment.getExecutionEnvironment
  13. val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
  14. env.setParallelism(1)
  15. inputStream.print()
  16. //先转换成样例类类型
  17. val dataStream = inputStream
  18. .map(data => {
  19. val arr = data.split(",") //按照,分割数据,获取结果
  20. SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是因为默认分割后是字符串类别
  21. })
  22. //定义es的连接信息
  23. val httpHosts = new util.ArrayList[HttpHost]()
  24. httpHosts.add(new HttpHost("127.0.0.1", 9200))
  25. //自定义写入es的ElasticsearchSinkFunction
  26. val myEsSinkFunc = new ElasticsearchSinkFunction[SensorReadingTest5] {
  27. override def process(t: SensorReadingTest5, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
  28. //定义一个map作为 数据源
  29. val dataSource = new util.HashMap[String, String]()
  30. dataSource.put("id", t.id)
  31. dataSource.put("temperature", t.temperature.toString)
  32. dataSource.put("ts", t.timestamp.toString)
  33. //创建index request ,指定index
  34. val indexRequest = Requests.indexRequest()
  35. indexRequest.index("sensors") //指定写入哪一个索引
  36. .source(dataSource) //指定写入的数据
  37. // .type("_doc") //我这里用的es7已经不需要这个参数了
  38. //执行新增操作
  39. requestIndexer.add(indexRequest)
  40. }
  41. }
  42. dataStream.addSink(new ElasticsearchSink.Builder[SensorReadingTest5](httpHosts, myEsSinkFunc)
  43. .build()
  44. )
  45. env.execute()
  46. }
  47. }

代码结构:

Flink从入门到真香(10、Sink数据输出-Elasticsearch)

到服务器上查看数据,sensor就是我们刚塞进去的数据
查看所有索引数据
[root@localhost ~]# curl http://127.0.0.1:9200/_cat/indices
green open .kibana-event-log-7.9.3-000001 NvnP2SI9Q_i-z5bNvsgWhA 1 0 1 0 5.5kb 5.5kb
yellow open sensors PGTeT0MZRJ-4hmYkDQnqIw 1 1 6 0 5.4kb 5.4kb
green open .apm-custom-link IdxoOaP9Sh6ssBd0Q9kPsw 1 0 0 0 208b 208b
green open .kibana_task_manager_1 -qAi_8LmTc2eJsWUQwugtw 1 0 6 3195 434.2kb 434.2kb
green open .apm-agent-configuration FG9PE8CARdyKWrdsAg4gbA 1 0 0 0 208b 208b
green open .kibana_1 uVmly8KaQ5uIXZ-IkArnVg 1 0 18 4 10.4mb 10.4m

查看塞进去的数据

  1. [root@localhost ~]# curl http://127.0.0.1:9200/sensors/_search
  2. {"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"sensors","_type":"_doc","_id":"h67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"41.0","id":"sensor1","ts":"1603766281"}},{"_index":"sensors","_type":"_doc","_id":"iK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"42.0","id":"sensor2","ts":"1603766282"}},{"_index":"sensors","_type":"_doc","_id":"ia7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"43.0","id":"sensor3","ts":"1603766283"}},{"_index":"sensors","_type":"_doc","_id":"iq7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.1","id":"sensor4","ts":"1603766240"}},{"_index":"sensors","_type":"_doc","_id":"i67gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"20.0","id":"sensor4","ts":"1603766284"}},{"_index":"sensors","_type":"_doc","_id":"jK7gkHUBr1E85RDXoNXP","_score":1.0,"_source":{"temperature":"40.2","id":"sensor4","ts":"1603766249"}}]}}
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/606483
推荐阅读
相关标签
  

闽ICP备14008679号