赞
踩
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.5</version>
</dependency>
/** * 生成ElasticsearchSink */ private static ElasticsearchSink<Tuple2<String, Long>> generateESSink() { // 配置HttpHost List<HttpHost> httpHosts = Collections.singletonList( //es的连接参数 new HttpHost("192.168.19.10", 9200,"http") ); ElasticsearchSinkFunction<Tuple2<String, Long>> sinkFunction = new ElasticsearchSinkFunction<Tuple2<String, Long>>() { @Override public void process(Tuple2<String, Long> tuple2, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { // 封装数据 HashMap<String, String> map = new HashMap<>(); map.put("content", tuple2.f0); map.put("eventTime", tuple2.f1.toString()); map.put("processTime", String.valueOf(System.currentTimeMillis())); // 封装Request IndexRequest request = Requests.indexRequest() .index("my_index")//这个索引必须已经存在 .type("my_data") .source(map); // 发送request requestIndexer.add(request); } }; ElasticsearchSink.Builder<Tuple2<String, Long>> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, sinkFunction); //设置缓存区的大小 (一次存入的数据条数) 如果不设置 默认一次写入 esSinkBuilder.setBulkFlushMaxActions(50); esSinkBuilder.setRestClientFactory( restClientBuilder -> { restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { // elasticsearch username and password CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("$USERNAME", "$PASSWORD")); return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }); } ); //构建essink return esSinkBuilder.build(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。