赞
踩
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.10.1</version>
</dependency>
package cn.edu.tju.demo; import org.apache.flink.api.common.functions.*; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.util.Collector; import org.apache.http.HttpHost; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.*; public class Test15 { private static String ES_SERVER = "xx.xx.xx.xx"; public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment .getExecutionEnvironment(); //DataStreamSource<String> mySource = environment.addSource(new MySourceFunction()); DataStream<String> mySource = environment.readTextFile("demo.txt"); List<HttpHost> hostList = new LinkedList<>(); hostList.add(new HttpHost(ES_SERVER,9200)); mySource.addSink(new ElasticsearchSink.Builder<String>(hostList, new MyEsSinkFunction()) .build()); environment.execute("my job"); } public static class MyEsSinkFunction implements ElasticsearchSinkFunction<String> { @Override public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { HashMap<String, String> dataSource = new HashMap<>(); dataSource.put("id", UUID.randomUUID().toString()); dataSource.put("welcome", s); IndexRequest indexRequest = Requests.indexRequest().index("person") .source(dataSource); requestIndexer.add(indexRequest); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。