赞
踩
1.maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-elasticsearch7_${scala.binary.version} </artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
2.自定义esSink类
package wangjian.sink import com.alibaba.fastjson.JSONObject import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.action.update.UpdateRequest import org.elasticsearch.client.Requests import org.slf4j.LoggerFactory /** * @author wmy * @create 2023/5/25 16:38 */ class MyEsSinkFunction extends ElasticsearchSinkFunction[JSONObject]{ val LOG = LoggerFactory.getLogger(classOf[MyEsSinkFunction]) override def process(element: JSONObject, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { val dataType = element.getString("data_type") if(dataType.equals("insert")){ requestIndexer.add(insertFn(element)) }else if(dataType.equals("update")){ requestIndexer.add(updateFn(element)) }else if(dataType.equals("upsert")){ requestIndexer.add(upsertFn(element)) }else{ LOG.error("element ERROR not data_type {}", element.toJSONString) } } def insertFn(record:JSONObject):IndexRequest ={ val index = record.getString("index") val id = record.getString("id") val data = record.getString("data") // 创建index request,用于发送http请求 val indexRequest = Requests.indexRequest() .index(index) .id(id) .source(data) indexRequest } def updateFn(record:JSONObject):UpdateRequest ={ val index = record.getString("index") val id = record.getString("id") val data = record.getString("data") // 创建index request,用于发送http请求 val updateRequest = new UpdateRequest() updateRequest.index(index) .id(id) .doc(data) .docAsUpsert(false) updateRequest } def upsertFn(record:JSONObject):UpdateRequest ={ val index = record.getString("index") val id = record.getString("id") val data = record.getString("data") // 创建index request,用于发送http请求 val updateRequest = new UpdateRequest() updateRequest.index(index) .id(id) .doc(data) .docAsUpsert(true) updateRequest } }
3.另一个工厂类
package wangjian.sink import java.util.Properties import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials} import org.apache.http.client.CredentialsProvider import org.apache.http.impl.client.BasicCredentialsProvider import org.apache.http.impl.nio.client.HttpAsyncClientBuilder import org.elasticsearch.client.RestClientBuilder import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback /** * @author wumengyang * @create 2023/5/25 16:40 */ class MyRestClientFactory(username:String,password:String) extends RestClientFactory{ override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = { val credentialsProvider: CredentialsProvider = new BasicCredentialsProvider credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)) restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback { override def customizeHttpClient(httpAsyncClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = { httpAsyncClientBuilder.disableAuthCaching() httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider) } }) } }
4.flink主类
import java.text.SimpleDateFormat import java.util import java.util.Properties import java.util.concurrent.TimeUnit import com.alibaba.fastjson.{JSON, JSONObject} import net.qihoo.operation.QihooJSONKeyValueDeserializationSchema import net.qihoo.util.{PropertiesUtils, RedisClient} import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.java.tuple import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.slf4j.LoggerFactory import wangjian.process.{MySQLAsyncByShopIdFunction, WjDataDealProcess} import wangjian.show.GetIllegalWord import wangjian.sink.ElasticsearchSinkUtils import wangjian.utils.{HttpUtils, OkHttpUtils} import scala.collection.JavaConverters._``` object ShopGoodsStream { def main(args: Array[String]): Unit = { // 初始化环境 val environment = StreamExecutionEnvironment.getExecutionEnvironment .... val restClientFactory = new MyRestClientFactory(properties.getProperty("es.user.name"), properties.getProperty("es.user.passwd")) val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost(properties.getProperty("es.addr").split("")(0), properties.getProperty("es.addr").split("")(1).toInt)) val sensorReadingBuilder = new ElasticsearchSink.Builder[JSONObject]( httpHosts, new MyEsSinkFunction ) sensorReadingBuilder.setRestClientFactory(restClientFactory) resultStream.addSink(sensorReadingBuilder.build()) .... environment.execute("test") } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。