赞
踩
本案例使用flink的table API将数据写入es中,其中flink版本为1.10.0,es版本为7.6.2
package com.bigdata.table import com.bigdata.apitest.source.SensorReading import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes, Table} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema} /** * @ description: 使用flink table API将数据写入es * @ author: spencer * @ date: 2020/7/29 16:53 */ object ESTableApiTest { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) env.setParallelism(1) //获取DataStream val inputDataStream: DataStream[SensorReading] = env.readTextFile("D:\\IdeaProjects\\flink-project\\src\\main\\resources\\sensor.txt") .map( data => { val dataArray: Array[String] = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) } ) val inputTable: Table = tableEnv.fromDataStream(inputDataStream) val resultTable: Table = inputTable .groupBy('id) // 必须导入table api的隐式转换:import org.apache.flink.table.api.scala._ // 才能使用单引号+字段 .select('id, 'id.count as 'total) // 定义ES的输出连接 tableEnv.connect( new Elasticsearch() .version("6") .host("localhost", 9200, "http") .index("sensor") .documentType("_doc") ) .inUpsertMode() .withFormat(new Json()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("count", DataTypes.BIGINT()) ) .createTemporaryTable("esOutputTable") resultTable.insertInto("esOutputTable") env.execute("ESTableApiTest") } }
es中查询结果如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。