赞
踩
1.依赖
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.10.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.bahir</groupId>
- <artifactId>flink-connector-redis_2.11</artifactId>
- <version>1.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.44</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.12</artifactId>
- <version>1.10.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-csv</artifactId>
- <version>1.10.1</version>
- </dependency>
-
- </dependencies>
2.实体
-
- // 传感器温度读数的数据类型
- public class SensorReading {
- // 属性:id,时间戳,温度值
- private String id;
- private Long timestamp;
- private Double temperature;
-
- public SensorReading() {
- }
-
- public SensorReading(String id, Long timestamp, Double temperature) {
- this.id = id;
- this.timestamp = timestamp;
- this.temperature = temperature;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(Long timestamp) {
- this.timestamp = timestamp;
- }
-
- public Double getTemperature() {
- return temperature;
- }
-
- public void setTemperature(Double temperature) {
- this.temperature = temperature;
- }
-
- @Override
- public String toString() {
- return "SensorReading{" +
- "id='" + id + '\'' +
- ", timestamp=" + timestamp +
- ", temperature=" + temperature +
- '}';
- }
- }
3.代码
- import org.apache.flink.api.common.functions.RuntimeContext;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
- import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
- import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
- import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
- import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink.Builder;
- import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
- import org.apache.http.HttpHost;
- import org.apache.http.auth.AuthScope;
- import org.apache.http.auth.UsernamePasswordCredentials;
- import org.apache.http.client.CredentialsProvider;
- import org.apache.http.impl.client.BasicCredentialsProvider;
- import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.client.Requests;
-
- import java.util.ArrayList;
- import java.util.HashMap;
- import org.elasticsearch.client.RestClientBuilder;
- import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
-
- public class SinkTest3_Es {
-
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- // 从文件读取数据
- DataStream<String> inputStream = env.readTextFile("D:\\sensor.txt");
-
- // 转换成SensorReading类型
- DataStream<SensorReading> dataStream = inputStream.map(line -> {
- String[] fields = line.split(",");
- return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
- });
-
- // 定义es的连接配置 不带用户名密码
- ArrayList<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("localhost", 9200));
- dataStream.addSink(
- new ElasticsearchSink.Builder<SensorReading>(httpHosts, new MyEsSinkFunction()).build());
- env.execute();
-
- // 定义es的连接配置 带用户名密码
- /* RestClientFactory restClientFactory = new RestClientFactory() {
- @Override
- public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY,
- new UsernamePasswordCredentials("用户名", "密码"));
- restClientBuilder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
- @Override
- public HttpAsyncClientBuilder customizeHttpClient(
- HttpAsyncClientBuilder httpAsyncClientBuilder) {
- httpAsyncClientBuilder.disableAuthCaching();
- return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- }
- });
- }
- };
- ArrayList<HttpHost> httpHosts = new ArrayList<>();
- httpHosts.add(new HttpHost("localhost", 9200));
- ElasticsearchSink.Builder<SensorReading> sensorReadingBuilder = new ElasticsearchSink.Builder<>(
- httpHosts,
- new MyEsSinkFunction());
- sensorReadingBuilder.setRestClientFactory(restClientFactory);
- dataStream.addSink(sensorReadingBuilder.build());
- env.execute();*/
- }
-
- // 实现自定义的ES写入操作
- public static class MyEsSinkFunction implements ElasticsearchSinkFunction<SensorReading> {
-
- @Override
- public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) {
- // 定义写入的数据source
- HashMap<String, String> dataSource = new HashMap<>();
- dataSource.put("id", element.getId());
- dataSource.put("temp", element.getTemperature().toString());
- dataSource.put("ts", element.getTimestamp().toString());
-
- // 创建请求,作为向es发起的写入命令
- IndexRequest indexRequest = Requests.indexRequest()
- .index("sensor")
- .type("readingdata")
- .source(dataSource);
-
- // 用index发送请求
- indexer.add(indexRequest);
- }
- }
- }
4.
D:\\sensor.txt
- sensor_1,1547718199,35.8
- sensor_6,1547718201,15.4
- sensor_7,1547718202,6.7
- sensor_10,1547718205,38.1
- sensor_1,1547718207,36.3
- sensor_1,1547718209,32.8
- sensor_1,1547718212,37.1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。