当前位置:   article > 正文

Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)_sink常用类型

sink常用类型

flink输出到es、redis、mysql、kafka、file

自己先准备一下相关环境

配置pom文件

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>
    <dependencies>
        <!-- 引入 Flink 相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

        <!--  引入kafka  -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>

    <!--  redis依赖  -->
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>

    <!--  ES依赖  -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

     <!-- Mysql依赖-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.47</version>
    </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141

公共实体类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@NoArgsConstructor
@ToString
@AllArgsConstructor
public class UserEvent {
    private String userName;
    private String url;
    private Long timestemp;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

KafkaSInk

将数据输出到kafka中,先启动kafka consumer,再运行程序

import com.event.UserEvent;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Properties;

public class KafkaSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        //kafka相关配置
        properties.setProperty("bootstrap.servers", "hadoop01:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> stream = env.fromCollection(Arrays.asList(
                "xiaoming,www.baidu.com,1287538716253",
                "Mr Li,www.baidu.com,1287538710000",
                "Mr Zhang,www.baidu.com,1287538710900"
        ));

        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
            	//输出规则
                String[] split = value.split(",");
                return new UserEvent(split[0].trim(), split[1].trim(), Long.valueOf(split[2].trim())).toString();
            }
        });
		//启动kafkaconsumer指令
		// ./bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092 --topic events
        result.addSink(new FlinkKafkaProducer<String>(
        		//kafka所在地址
                "hadoop01:9092",
                //指定输出的topic
                "events",
                new SimpleStringSchema()
        ));

        env.execute();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

运行结果

在这里插入图片描述

ElasticsearchSink(EsSink)

将数据输出到elasticsearch

示例代码


import com.event.UserEvent;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.table.descriptors.Elasticsearch;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

public class EsSinkTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserEvent> userEventDataStreamSource =
                env.fromCollection(
                        Arrays.asList(
                                new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L),
                                new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()),
                                new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("mary", "path?checkParam", System.currentTimeMillis()),
                                new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L)
                        ));


        //定义host列表
        List<HttpHost> hosts = Arrays.asList(new HttpHost("hadoop01", 9200));

        //定义ElasticsearchSinkFunction
        ElasticsearchSinkFunction<UserEvent> elasticsearchSinkFunction = new ElasticsearchSinkFunction<UserEvent>() {
            @Override
            public void process(UserEvent userEvent, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                IndexRequest indexRequest = Requests.indexRequest()
                        .index("events")
                        .type("type")
                        .source(new HashMap<String, String>() {{
                            put(userEvent.getUserName(), userEvent.getUrl());
                        }});
                requestIndexer.add(indexRequest);
            }
        };

        //写入es
        userEventDataStreamSource.addSink(new ElasticsearchSink.Builder<>(hosts, elasticsearchSinkFunction).build());

        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

指令

GET _cat/indices

GET _cat/indices/events

GET events/_search
  • 1
  • 2
  • 3
  • 4
  • 5

运行结果
在这里插入图片描述

RedisSink

将数据输出到Redis

示例代码


import com.event.UserEvent;
import my.test.source.CustomSouce;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class RedisSinkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<UserEvent> streamSource = env.addSource(new CustomSouce());

        //创建jedis连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("master")
                .setTimeout(10000)
                .setPort(6379)
                .build();
        

        //写到redis
        streamSource.addSink(new RedisSink<>(config, new MyRedisMapper()));


        env.execute();
    }

    public static class MyRedisMapper implements RedisMapper<UserEvent>{
        @Override
        public RedisCommandDescription getCommandDescription() {
            //写入方式为hset
            return new RedisCommandDescription(RedisCommand.HSET, "events"); //additionalKey参数标识存储再哪里
        }

        @Override
        public String getKeyFromData(UserEvent userEvent) {
            return userEvent.getUserName();
        }

        @Override
        public String getValueFromData(UserEvent userEvent) {
            return userEvent.getUrl();
        }
    }



}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

自定义source

import com.event.UserEvent;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

public class CustomSouce implements SourceFunction<UserEvent> {
    // 声明一个布尔变量,作为控制数据生成的标识位
    private Boolean running = true;

    @Override
    public void run(SourceContext<UserEvent> ctx) throws Exception {
        Random random = new Random(); // 在指定的数据集中随机选取数据
        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
                "./prod?id=2"};
        while (running) {
            ctx.collect(new UserEvent(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar .getInstance().getTimeInMillis()
            ));
            // 隔 1 秒生成一个点击事件,方便观测
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        running = false;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

运行结果
因为上述source是一个无界流,所以数据一直会变化
在这里插入图片描述

MysqlSink(JdbcSink)

将数据输出到mysql

表结构

create table events(
    user_name varchar(20) not null,
    url varchar(100) not null
);
  • 1
  • 2
  • 3
  • 4

示例代码


import com.event.UserEvent;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

public class MysqlSinkTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //一组数据
        DataStreamSource<UserEvent> userEventDataStreamSource =
                env.fromCollection(
                        Arrays.asList(
                                new UserEvent("zhangsan", "/path?test123", System.currentTimeMillis() - 2000L),
                                new UserEvent("zhangsan", "/path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("lisi", "/path?checkParam", System.currentTimeMillis()),
                                new UserEvent("bob", "/path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("mary", "/path?checkParam", System.currentTimeMillis()),
                                new UserEvent("lisi", "/path?checkParam123", System.currentTimeMillis() - 2000L)
                        ));


       userEventDataStreamSource.addSink(JdbcSink.sink(
       			//要执行的sql语句
               "INSERT INTO events (user_name, url) VALUES(?, ?)",
               new JdbcStatementBuilder<UserEvent>() {
                   @Override
                   public void accept(PreparedStatement preparedStatement, UserEvent userEvent) throws SQLException {
                   		//sql占位符赋值
                        preparedStatement.setString(1, userEvent.getUserName());
                        preparedStatement.setString(2, userEvent.getUrl());
                   }
               },
               //jdbc相关参数配置
               new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                       .withUrl("jdbc:mysql://hadoop01:3306/mysql")
                       .withUsername("root")
                       .withPassword("123456")
                       .withDriverName("com.mysql.jdbc.Driver")
                       .build()
       ));

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

当程序运行结束之后可以看到mysql的events表里面多了数据
在这里插入图片描述

FileSink

将数据输出到文件中(可以输出分区文件)

import com.event.UserEvent;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.TimeUtils;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public class FileSinkTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserEvent> userEventDataStreamSource =
                env.fromCollection(
                        Arrays.asList(
                                new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L),
                                new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()),
                                new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L),
                                new UserEvent("mary", "path?checkParam", System.currentTimeMillis()),
                                new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L)
                        ));


        StreamingFileSink<String> streamingFileSink = StreamingFileSink.
                <String>forRowFormat(new Path("./output/"), new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                //不活跃的间隔时间,用于归档保存使用
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .build()
                ).build();

        userEventDataStreamSource.map(data -> data.getUserName()).addSink(streamingFileSink);


        env.execute();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

运行结束后会多出来一些文件
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/606528
推荐阅读
相关标签
  

闽ICP备14008679号