赞
踩
eg:Flink流中的查询和存储;
此处的处理没有写成项目中使用的比较复杂的可配置化的形式,也就是只针对单表测试表的操作;
<scala.main.version>2.11</scala.main.version> <flink.version>1.10.1</flink.version> <es.version>7.6.1</es.version> <!--flink--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.main.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.main.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.main.version}</artifactId> <version>1.10.1</version> </dependency> <!--flink table & sql--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.main.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.main.version}</artifactId> <version>${flink.version}</version> </dependency> <!--导入flink连接redis的文件--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_${scala.main.version}</artifactId> <version>${flink.redis.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_${scala.main.version}</artifactId> <version>${flink.version}</version> </dependency> <!--rocksdb 与flink 进行整合依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.9.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_${scala.main.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- es客户端的依赖,其中包含了一个核心的api: TransportClient --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>${es.version}</version> </dependency>
//此处中的properties只是项目中运用到了一些环境参数,大家学习过程中直接使用定参数到全局配置中即可 object FlinkFreelySourcePutInMain { def main(args: Array[String]): Unit = { //环境 val env = StreamExecutionEnvironment.getExecutionEnvironment var prop: Properties = null if (args != null && args.length == 2) { println(args(0)) val env: String = args(0) println("args",args.toList) prop = ConfigUtils.getProperties(env , args(1)) println("获取配置",prop) // println("size:" + prop.size()) } FlinkFreelySourcePutInProcessing.processor(env, prop) env.execute(this.getClass.getSimpleName) } }
import java.util.Properties import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} /** * @Author mr_lu(鹿) * @Description ****** * @Date 2021-06-04 下午6:59 * @Copyright 代码类版权的最终解释权归属mr_lu本人所有; **/ object FlinkFreelySourcePutInProcessing extends FlinkPropertiesProcessor { override def processor(env: StreamExecutionEnvironment, prop: Properties): Unit = { /** * 1. kafka source-多数据源配置 * 2. 数据处理,根据不同配置方式按照主键入库 * 3. ES - sink 根据配置存储不同数据 */ /** * import */ import org.apache.flink.api.scala._ /** * 获取配置 */ val KAFKA_BROKER_SERVERS = prop.getProperty(ConfConstant.KAFKA_BROKER_KEY) val ZOOKEEPER_QUORUM_KEY = prop.getProperty(ConfConstant.ZOOKEEPER_QUORUM_KEY) val ES_NODES_HOSTS_NAME = prop.getProperty(ConfConstant.ES_NODES_KEY) println("KAFKA_BROKER_KEY", KAFKA_BROKER_SERVERS) println("ZOOKEEPER_QUORUM_KEY", ZOOKEEPER_QUORUM_KEY) /** * 这个是这次要实现的全局参数传递,具体使用就是在富函数中调用 * 内部参数传递 */ val configuration = new Configuration() configuration.setString("ES_NODES_HOSTS_NAME", ES_NODES_HOSTS_NAME) env.getConfig.setGlobalJobParameters(configuration) /** * 1. kafka source-多数据源配置 */ val dataStream = MyKafkaSource.myKafkaSource( env, KAFKA_BROKER_SERVERS, List("FreelySourcePutInTopic") ) /** * 2. 数据处理,根据不同配置方式按照主键入库,过程中涉及Flink关于Elasticsearch的查询 */ val freelySourcePutInMapStream: DataStream[FlinkElasticSearchSourceInsertBean] = dataStream.map(FlinkFreelySourcePutInMap) freelySourcePutInMapStream.addSink(MyElasticSearchSink) } }
case class FlinkElasticSearchSourceInsertBean(
data : JSONObject ,
id : String ,
index : String
)
import java.util.Properties import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer} /** * @Author mr_lu(鹿) * @Description ****** * @Date 2021-03-25 上午11:28 * @Copyright 代码类版权的最终解释权归属mr_lu本人所有; **/ object MyKafkaSource { def myKafkaSource( env : StreamExecutionEnvironment , bootstrapServers : String , topics : List[String] ): DataStream[String] ={ /** * 获取基础参数 */ import org.apache.flink.api.scala._ import scala.collection.JavaConversions._ /** * 定义kafka-source得到DataStream */ //将kafka中数据反序列化, val valueDeserializer: DeserializationSchema[String] = new SimpleStringSchema() val properties = new Properties() properties.put("bootstrap.servers", bootstrapServers) val kafkaSinkDStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](topics, valueDeserializer, properties)) kafkaSinkDStream } }
import com.alibaba.fastjson.{JSON, JSONObject} import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.Configuration import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.client.RestHighLevelClient /** * @Author mr_lu(鹿) * @Description ****** * @Date 2021-06-04 下午7:18 * @Copyright 代码类版权的最终解释权归属mr_lu本人所有; **/ object FlinkFreelySourcePutInMap extends RichMapFunction[String, FlinkElasticSearchSourceInsertBean] { /** * 基础对象属性定义 */ var esClient: RestHighLevelClient = null var ES_NODES_HOSTS_NAME : String = null /** * 初始化 * 初始化链接的时候获取全局配置参数 * @param parameters */ override def open(parameters: Configuration): Unit = { //初始化es链接 val configuration = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[Configuration] ES_NODES_HOSTS_NAME = configuration.getString("ES_NODES_HOSTS_NAME", "null") println("map初始化es连接成功!") esClient = EsClient.getEsClient(ES_NODES_HOSTS_NAME) } override def map(value: String): FlinkElasticSearchSourceInsertBean = { /** * 初始化解析值 */ val jsonObject = JSON.parseObject(value) val freelyDataflag = jsonObject.getString("freely_data_flag") val customerId = jsonObject.getString("customer_id") val data = jsonObject.getString("data") //如果纵横标识为1那么只推一次,直接返回值 if (freelyDataflag == "1") { LoggerPrint.loggerInfoPrint(s"customer_id=$customerId,纵横标记为1,直接返回!") FlinkElasticSearchSourceInsertBean(JSON.parseObject(value) , customerId , "vs_freely_source_put_in_data") //如果纵横标识为n那么会重复多次推,需要进行查询拼接返回 } else if (freelyDataflag == "n") { val termsJson = new JSONObject() termsJson.put("customer_id", customerId) //请求获取es中的数据 val searchResponse = EsSearchUtils.searchHitsByTerms( esClient, "vs_freely_source_put_in_data", termsJson, 1 ) //拿到es中原始数据 val hits = SingleResponseAnalysisUtils.analysisSearchResponseToHits(searchResponse) //判断来定义es中是否有历史记录,如果有:新老数据做合并,没有:直接返回新数据 if (hits.length > 0) { val esOldJson = JSON.parseObject(hits(0).getSourceAsString) val resultJson = JSON.parseObject(value) // LoggerPrint.loggerTrait(s"customer_id=$customerId,纵横标记为n,merginJsonFun=${JsonUtils.merginJsonFun(JSON.parseObject(data) , esOldJson.getJSONObject("data"))}") resultJson.put("data" , JsonUtils.merginSingleJsonFun(esOldJson.getJSONObject("data") , JSON.parseObject(data))) // LoggerPrint.loggerTrait(s"customer_id=$customerId,纵横标记为n,esOldJson=$esOldJson") LoggerPrint.loggerInfoPrint(s"customer_id=$customerId,纵横标记为n,resultJson=$resultJson") FlinkElasticSearchSourceInsertBean( resultJson , customerId , "vs_freely_source_put_in_data" ) } else { FlinkElasticSearchSourceInsertBean(JSON.parseObject(value) , customerId , "vs_freely_source_put_in_data") } } else { FlinkElasticSearchSourceInsertBean(JSON.parseObject(value) , customerId , "vs_freely_source_put_in_data") } } override def close(): Unit = { if(esClient != null){ esClient.close() }else{ println("FlinkFreelySourcePutInMap处理过程中esClient为空!") } } }
/** * 多条件term,过滤查询 * * @param client 客户端 * @param index 索引 * @param terms 条件组,json中kv * @return hits */ def searchHitsByTerms(client: RestHighLevelClient, index: String, terms: JSONObject, size: Int ): SearchResponse = { val searchRequest = new SearchRequest(index) val searchSourceBuilder = new SearchSourceBuilder().size(size) val boolQueryBuilder = QueryBuilders.boolQuery() /** * 处理term */ terms.keySet().toArray().foreach(jsonKey => { val termQueryBuilder = QueryBuilders.termQuery(jsonKey.toString, terms.getString(jsonKey.toString)) boolQueryBuilder.must().add(termQueryBuilder) }) searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) client.search( searchRequest, RequestOptions.DEFAULT ) } /** * 解析 searchResponse 为 SearchHit 集合 * @param searchResponse * @return */ def analysisSearchResponseToHits( searchResponse : SearchResponse ): Array[SearchHit] ={ searchResponse.getHits.getHits }
package com.utils.flink.sinks import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.elasticsearch.client.RestHighLevelClient /** * @Author mr_lu(鹿) * @Description ****** * @Date 2021-06-04 下午7:24 * @Copyright 代码类版权的最终解释权归属mr_lu本人所有; **/ object MyElasticSearchSink extends RichSinkFunction[FlinkElasticSearchSourceInsertBean] { /** * 基础对象属性定义 */ var esClient: RestHighLevelClient = null var ES_NODES_HOSTS_NAME: String = null /** * 初始化建立es高级连接 * * @param parameters */ override def open(parameters: Configuration): Unit = { //初始化es链接 val configuration = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[Configuration] ES_NODES_HOSTS_NAME = configuration.getString("ES_NODES_HOSTS_NAME", "null") esClient = EsClient.getEsClient(ES_NODES_HOSTS_NAME) println("sink初始化es连接成功!") } /** * 进行数据处理 * * @param value * @param context */ override def invoke(value: FlinkElasticSearchSourceInsertBean, context: SinkFunction.Context[_]): Unit = { /** * 处理逻辑 */ InsertUtils.insertESData( value.data, esClient, value.id, value.index ) println("Sink写入成功!", value.data) } /** * 关闭连接 */ override def close(): Unit = { if (esClient != null) { esClient.close() } else { println("MyElasticSearchSink处理过程中esClient为空!") } } }
/**
* 插入数据直接按照id覆盖
*
* @param data 要插入的data
* @param client es client
* @param id id
* @param index 索引名
*/
def insertESData(data: JSONObject, client: RestHighLevelClient, id: String, index: String): Unit = {
client.index(
new IndexRequest(index).id(id).timeout(TimeValue.timeValueSeconds(3)).source(data),
// new IndexRequest(index, index).id(id).timeout(TimeValue.timeValueSeconds(2)).source(data),
RequestOptions.DEFAULT
)
}
#!/bin/bash
flink run -m yarn-cluster \
-c xxx.xxx \
-p 8 \
/Linux根目录
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。