赞
踩
public class SinkTest01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(4);
// 文件中读取数据
DataStreamSource<String> dataStreamSource = environment.readTextFile("E:\atguigu05\flink\flink02-java\src\main\resources\clicks.txt");
StreamingFileSink<String> sink = StreamingFileSink.
<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024 * 1024)
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.build()
)
.build();
dataStreamSource.map(data -> data.toString()).addSink(sink);
environment.execute();
}
}
kafka -> flink转换 -> kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
public class SinkKafkaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
// kafka 输入
DataStreamSource<String> streamSource = environment.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));
SingleOutputStreamOperator<String> resultStream = streamSource.map(data -> {
String[] words = data.split(",");
return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2])).toString();
});
// Kafka 输出, 写入到Kafka, 新的topic, 输入的类型转换
resultStream.addSink(new FlinkKafkaProducer<String>("hadoop102:9092", "events", new SimpleStringSchema()));
environment.execute();
}
}
maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
public class SinkTestMysql {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStreamSource<Event> streamSource = environment.fromElements(
new Event("1", "1", 1L),
new Event("2", "2", 1L),
new Event("3", "3", 1L)
);
streamSource.addSink(JdbcSink.sink(
"insert into clicks values (?, ?)",
((statement, event) -> {
statement.setString(1, event.user);
statement.setString(2, event.url);
}),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("root")
.build()
));
environment.execute();
}
}
maven:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
public class SinkRedisTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStreamSource<String> dataStreamSource = environment.readTextFile("E:\atguigu05\flink\flink02-java\src\main\resources\clicks.txt");
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("hadoop102")
.build();
dataStreamSource.addSink(new RedisSink<>(config, new MyRedisMapper()));
environment.execute();
}
public static class MyRedisMapper implements RedisMapper<String>{
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "clicks");
}
@Override
public String getKeyFromData(String s) {
return s;
}
@Override
public String getValueFromData(String s) {
return s;
}
}
}
maven:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
public class SinkElasticSearchTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
DataStreamSource<Event> streamSource = environment.fromElements(
new Event("1", "1", 1L),
new Event("2", "2", 1L),
new Event("3", "3", 1L)
);
ArrayList<HttpHost> list = new ArrayList<>();
list.add(new HttpHost("hadoop102", 9200));
ElasticsearchSinkFunction<Event> sinkFunction = new ElasticsearchSinkFunction<Event>() {
@Override
public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
HashMap<String, String> map = new HashMap<>();
// 构建Request
IndexRequest request = Requests.indexRequest()
.index("clicks")
.type("type")
.source(map);
requestIndexer.add(request);
}
};
streamSource.addSink(new ElasticsearchSink.Builder<>(list, sinkFunction).build());
environment.execute();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。