当前位置:   article > 正文

Flink教程4_Sink输出_datastreamsource.addsink

datastreamsource.addsink

文章目录

1.输出到File文件

public class SinkTest01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(4);
        // 文件中读取数据
        DataStreamSource<String> dataStreamSource = environment.readTextFile("E:\atguigu05\flink\flink02-java\src\main\resources\clicks.txt");

        StreamingFileSink<String> sink = StreamingFileSink.
                <String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .build()
                )
                .build();
        dataStreamSource.map(data -> data.toString()).addSink(sink);
        environment.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

在这里插入图片描述

2.输出到Kafka中

kafka -> flink转换 -> kafka

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


public class SinkKafkaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop102:9092");

        // kafka 输入
        DataStreamSource<String> streamSource = environment.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));

        SingleOutputStreamOperator<String> resultStream = streamSource.map(data -> {
            String[] words = data.split(",");
            return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2])).toString();
        });

        // Kafka 输出, 写入到Kafka, 新的topic, 输入的类型转换
        resultStream.addSink(new FlinkKafkaProducer<String>("hadoop102:9092", "events", new SimpleStringSchema()));

        environment.execute();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.输出到Mysql中

maven

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>


public class SinkTestMysql {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.setParallelism(1);

        DataStreamSource<Event> streamSource = environment.fromElements(
                new Event("1", "1", 1L),
                new Event("2", "2", 1L),
                new Event("3", "3", 1L)
        );

        streamSource.addSink(JdbcSink.sink(
                "insert into clicks values (?, ?)",
                ((statement, event) -> {
                    statement.setString(1, event.user);
                    statement.setString(2, event.url);
                }),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://localhost:3306/test")
                .withDriverName("com.mysql.jdbc.Driver")
                .withUsername("root")
                .withPassword("root")
                .build()

        ));
        environment.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

在这里插入图片描述

4.输出到Redis中

maven:

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>


public class SinkRedisTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        DataStreamSource<String> dataStreamSource = environment.readTextFile("E:\atguigu05\flink\flink02-java\src\main\resources\clicks.txt");

        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop102")
                .build();

        dataStreamSource.addSink(new RedisSink<>(config, new MyRedisMapper()));

        environment.execute();
    }
    public static class MyRedisMapper implements RedisMapper<String>{

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "clicks");
        }

        @Override
        public String getKeyFromData(String s) {
            return s;
        }

        @Override
        public String getValueFromData(String s) {
            return s;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

在这里插入图片描述

5.输出到ElasticSearch中

maven:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


public class SinkElasticSearchTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        DataStreamSource<Event> streamSource = environment.fromElements(
                new Event("1", "1", 1L),
                new Event("2", "2", 1L),
                new Event("3", "3", 1L)
        );

        ArrayList<HttpHost> list = new ArrayList<>();
        list.add(new HttpHost("hadoop102", 9200));
        ElasticsearchSinkFunction<Event> sinkFunction = new ElasticsearchSinkFunction<Event>() {
            @Override
            public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                HashMap<String, String> map = new HashMap<>();

                // 构建Request
                IndexRequest request = Requests.indexRequest()
                        .index("clicks")
                        .type("type")
                        .source(map);
                requestIndexer.add(request);
            }
        };
        streamSource.addSink(new ElasticsearchSink.Builder<>(list, sinkFunction).build());
        environment.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/615105
推荐阅读
相关标签
  

闽ICP备14008679号