当前位置:   article > 正文

flinkSQL-iceberg测试案例_flink 1.10.1 写iceberg的代码案例

flink 1.10.1 写iceberg的代码案例

kafka-flinkSQL-iceberg测试案例

使用flink中datagen connector进行数据的模拟生成,同时将生成的数据写入到kafka中,模拟流式数据的生成;然后再使用flink对kafka中的数据进行消费,存入到数据湖iceberg中。
该流程类似与上一篇文章:kafka-flinkSQL-hudi的测试,有兴趣请参考上篇文章。

简单流程说明

1、使用flink模拟生成数据:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Date;

/**
 * @author hyl
 * @date 2022/5/6
 * @Description
 * @ModifiedBy
 */
public class CreateDataAll {
    public static void main(String[] args) {


        // datagen source 随机生成数据

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String sourceDDL = "create table datagen_source (\n" +
                " id int,\n" +
                " data string,\n" +
                "price double,\n" +
                " ts as localtimestamp,\n" +
                " watermark for ts as ts\n" +
                ") with (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='" + args[0] + "',\n" +  // 每秒生成多少条数据,传入参数
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='" + args[1] + "',\n" +  // id的起始位置
                " 'fields.id.end'='" + args[2] + "',\n" +  // id的结束位置
                " 'fields.data.length'='50'\n" +
                ")";
        System.out.println(sourceDDL);
        tableEnv.executeSql(sourceDDL);



        // kafka sink
        String sinkDDL = "create table kafka_sink (\n" +
                " id int,\n" +
                " data string,\n" +
                "price double,\n" +
                " ts timestamp\n" +
                ") with (\n" +
                " 'connector' = 'kafka',\n" +
                " 'topic' = '" + args[3] + "',\n" +
                " 'properties.bootstrap.servers' = '192.168.100.14:9092',\n" +
                " 'properties.group.id' = 'flinkToKafka3',\n" +
                " 'scan.startup.mode' = 'earliest-offset',\n" +
                " 'format' = 'json',\n" +
                " 'json.fail-on-missing-field' = 'false',\n" +
                " 'json.ignore-parse-errors' = 'true'\n" +
                ")";
        System.out.println(sinkDDL);
        tableEnv.executeSql(sinkDDL);

        // insert into kafka,其实就是利用flink的自己模拟生成的数据,来发送给kafka,模拟流数据id>1000000 or
        String insertKafka = "insert into kafka_sink\n" +
                "select * from datagen_source ";
        System.out.println(insertKafka);
        tableEnv.executeSql(insertKafka);

        // print

        final Date date_end = new Date();
        System.out.println("起始时间:" + date_end.toString() + "*********************************************");


    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

2、使用flink处理kafka数据实时写入iceberg

import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author hyl
 * @date 2022/4/28
 * @Description 使用flink将kafka的数据拉来处理,实时写入iceberg中
 */
public class KafkaToIceberg {
    public static void main(String[] args) {

        System.setProperty("HADOOP_USER_NAME", "root");
        System.setProperty("user.name", "root");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(6000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.getCheckpointConfig().enableUnalignedCheckpoints();
        env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.100.11:9000/tmp/hudi/testFlinkHudi/checkpoint/dir");

        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


//        // kafka source
        String kafkaSourceDDL = "create table kafka_source (\n" +
                " id int,\n" +
                " data string,\n" +
                "price double,\n" +
                " ts timestamp\n" +
                ") with (\n" +
                " 'connector' = 'kafka',\n" +
                " 'topic' = '" + args[0] + "',\n" +  // 消费topic
                " 'properties.bootstrap.servers' = '192.168.100.14:9092',\n" +
                " 'properties.group.id' = 'flinkToKafka3',\n" +
                " 'scan.startup.mode' = 'earliest-offset',\n" +
                " 'format' = 'json',\n" +
                " 'json.fail-on-missing-field' = 'false',\n" +
                " 'json.ignore-parse-errors' = 'true'\n" +
                ")";
        tableEnv.executeSql(kafkaSourceDDL);
        System.out.println(kafkaSourceDDL);


        // 使用table api 创建 hadoop catalog
        TableResult tableResult = tableEnv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
                "  'type'='iceberg',\n" +
                "  'catalog-type'='hadoop',\n" +
                "  'warehouse'='hdfs://192.168.100.11:9000/tmp/iceberg/flink_iceberg/FlinkClientTable/',\n" +
                "  'property-version'='2', \n" +
                "  'format-version'='2'\n" +
                ")");



        // 建立一张新表
        String createHudiTable = "create table if not exists hadoop_catalog.iceberg_hadoop_db."+args[1]+" (\n" +
                " id int,\n" +
                " data string,\n" +
                " price double,\n" +
                " ts timestamp(3),\n" +
                " part int,\n"+
                " event_time string,\n" +
                " event_date string, \n" +
                " PRIMARY KEY (`id`) NOT ENFORCED \n"+
                ") partitioned by ( part ) \n" +
                "WITH (\n" +
                " 'property-version'='2', \n" +  
                " 'format-version'='2' \n" + 
                ")";
        System.out.println(createHudiTable);
        tableEnv.executeSql(createHudiTable);

//        // 流式摄取到iceberg中
        String insertDML = "insert into hadoop_catalog.iceberg_hadoop_db."+args[1]+" \n" +
                "select\n" +
                " id,\n" +
                " data,\n" +
                " price,\n " +
                " ts,\n" +
                " id/100000 as part,\n"+
                " date_format(ts, 'yyyy-MM-dd HH:mm:ss') as event_time,\n" +
                " date_format(ts, 'yyyy-MM-dd') as event_date\n" +
                "from kafka_source";
        tableEnv.executeSql(insertDML);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95

POM文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>FlinkToIceberg</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <!-- project compiler -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <!-- maven compiler-->
        <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version>
        <!-- sdk -->
        <java.version>1.8</java.version>
        <!-- engine-->
        <hadoop.version>2.9.2</hadoop.version>
        <flink.version>1.13.6</flink.version>
        <iceberg.version>0.13.1</iceberg.version>
        <!--        <flink.version>1.12.2</flink.version>-->
        <!--        <hoodie.version>0.9.0</hoodie.version>-->
        <hive.version>2.3.9</hive.version>

        <!--         <scope.type>provided</scope.type>-->
        <scope.type>compile</scope.type>
    </properties>

    <dependencies>

        <!-- flink Dependency 这个是本地可以查看web执行界面-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <!--flink-core核心包-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <!--实现自定义connector接口-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <!--导入flink-table的java版本api-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <!--运行在本地的idea,需要加入以下两个模块-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.6</version>
        </dependency>

        <!-- hadoop Dependency,之前没有写这个依赖怎么存储到Hadoop上面-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <scope>${scope.type}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <scope>${scope.type}</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.13 -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime-1.13</artifactId>
            <version>0.13.1</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-api -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-api</artifactId>
            <version>0.13.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-data -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-data</artifactId>
            <version>0.13.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink</artifactId>
            <version>0.13.1</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!--瘦子打包,只打包代码-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>libs/</classpathPrefix>
                            <mainClass>
                                KafkaToIceberg
                            </mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179

谢谢观看,如果感兴趣flink-hudi案例测试,可以参考笔者之前的文章。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/1001415
推荐阅读
相关标签
  

闽ICP备14008679号