赞
踩
Apache SeaTunnel 是一个分布式、高性能、易扩展、用于海量数据(离线&实时)同步和转化的开源数据集成平台, 支持spark、flink 及自研 Zeta 引擎,有庞大的用户社群.
Apache Iceberg 是一个开源的表格格式,它旨在改善大数据生态系统中复杂的数据湖管理。作为Apache软件基金会的一部分,Iceberg专为提供更强大、更灵活的数据湖表格管理功能而设计,它通过提供一种更加高效和可靠的方式来处理大规模数据集,从而解决了传统数据湖在数据可靠性、性能和可维护性方面的挑战。
Apache SeaTunnel connector-Iceberg 是专门为Iceberg引擎开发的数据同步组件, 主要为了方便SeaTunnel 用户能更加友好的使用Iceberg来构建企业级数据湖仓
Name | Type | Required | Default | Description |
---|---|---|---|---|
catalog_name | string | yes | default | 用户指定的目录名称。默认为 default。 |
namespace | string | yes | default | Iceberg 数据库名称。默认为 default |
table | string | yes | - | Iceberg 表名称。 |
iceberg.catalog.config | map | no | - | 指定用于初始化 Iceberg 目录的属性,具体配置参考:Iceberg Catalog Properties |
hadoop.config | map | no | - | 指定 Hadoop 配置的属性,具体配置参考: Hadoop Configuration |
iceberg.hadoop-conf-path | string | no | - | 指定加载 'core-site.xml'、'hdfs-site.xml'、'hive-site.xml' 文件的路径。 |
case_sensitive | boolean | no | false | 控制是否以区分大小写的方式匹配 schema。 |
iceberg.table.write-props | map | no | - | 传递给 Iceberg 写入器初始化的属性,这些属性具有优先权,可以在 Iceberg Write Properties 找到具体参数。 |
iceberg.table.auto-create-props | map | no | - | Iceberg 在自动创建表时指定的配置, 具体参照: Table Behavior Properties |
iceberg.table.schema-evolution-enabled | boolean | no | false | 将其设置为 true 可以使 Iceberg 表在同步过程中支持模式演变。目前仅支持添加字段 和 部分类型变更 |
iceberg.table.primary-keys | string | no | - | 表的主键配置,多个主键用","分割 ,与 "iceberg.table.upsert-mode-enabled" 一起使用,用于同主键数据的增量更新 |
iceberg.table.upsert-mode-enabled | boolean | no | false | 将其设置为 true 以启用 upsert 模式,默认为 false, 用于 Iceberg 中数据的增量更新 |
iceberg.table.partition-keys | string | no | - | 创建表时指定的分区字段,多个分区字段使用","分隔。 |
iceberg.table.commit-branch | string | no | - | 指定数据提交的分支 |
env { parallelism = 1 job.mode = "BATCH" # You can set spark configuration here spark.app.name = "SeaTunnel" spark.executor.instances = 2 spark.executor.cores = 1 spark.executor.memory = "1g" spark.master = local } source { FakeSource { row.num = 100 schema = { fields { c_map = "map<string, string>" c_array = "array<int>" c_string = string c_boolean = boolean c_tinyint = tinyint c_smallint = smallint c_int = int c_bigint = bigint c_float = float c_double = double c_decimal = "decimal(30, 8)" c_bytes = bytes c_date = date c_timestamp = timestamp } } result_table_name = "fake" } } transform { } sink { Iceberg { catalog_name="seatunnel_test" iceberg.catalog.config={ "type"="hadoop" "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/" } namespace="seatunnel_namespace" table="iceberg_sink_table" iceberg.table.write-props={ write.format.default="parquet" write.target-file-size-bytes=10 } iceberg.table.partition-keys="c_timestamp" case_sensitive=true } }
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source { MySQL-CDC { result_table_name="customer_result_table" catalog { factory = Mysql } debezium = { # include ddl "include.schema.changes" = true } database-names=["mysql_cdc"] table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] format=DEFAULT username = "st_user" password = "seatunnel" base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" } }
transform { }
sink { Iceberg { catalog_name="seatunnel_test" iceberg.catalog.config={ "type"="hadoop" "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-cdc-sink/" } namespace="seatunnel_namespace" table="iceberg_sink_table" iceberg.table.write-props={ write.format.default="parquet" write.target-file-size-bytes=10 } iceberg.table.primary-keys="id" iceberg.table.partition-keys="f_datetime" iceberg.table.upsert-mode-enabled=true iceberg.table.schema-evolution-enabled=true case_sensitive=true } }
```
基于Apache SeaTunnel来构建数据湖项目, 我们可以直接引用 SeaTunnel 强大的组件生态,不用独立构造新的项目来实现业务需求,同时Apache SeaTunnel的标准的架构设计也为熟悉开源的朋友提供了快速独立扩展的机会,可以在此基础上快速扩展自己的需求,做出符合自己业务需要的组件, 也欢迎大家试用 Iceberg-connect , 希望能帮大家真正解决实际生产场景中遇到的问题,
也希望大家能积极反馈使用中的问题,并贡献场景,大家共同来解决,并促进 Iceberg-connect 组件的完善, 一起共创数据开发的新场景.
本文由 白鲸开源科技 提供发布支持!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。