当前位置:   article > 正文

seatunnel 高性能分布式数据集成平台

seatunnel


一、介绍

seatunnel 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上,每天可以稳定高效同步数百亿数据,已在近百家公司生产上使用。
可以直接运行的软件包下载地址:https://github.com/InterestingLab/seatunnel/releases
快速入门:https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v1/quick-start
关于 seatunnel 的详细文档

二、为什么我们需要 seatunnel

seatunnel 尽所能为您解决海量数据同步中可能遇到的问题:

  • 数据丢失与重复
  • 任务堆积与延迟
  • 吞吐量低
  • 应用到生产环境周期长
  • 缺少应用运行状态监控

三、seatunnel 使用场景

  • 海量数据同步
  • 海量数据集成
  • 海量数据的 ETL
  • 海量数据聚合
  • 多源数据处理

四、seatunnel 的特性

  • 简单易用,灵活配置,无需开发
  • 实时流式处理
  • 离线多源数据分析
  • 高性能、海量数据处理能力
  • 模块化和插件化,易于扩展
  • 支持利用 SQL 做数据处理和聚合
  • 支持 Spark Structured Streaming
  • 支持 Spark 2.x

五、seatunnel 的工作流程

在这里插入图片描述

Input[数据源输入] -> Filter[数据处理] -> Output[结果输出]

多个 Filter 构建了数据处理的 Pipeline,满足各种各样的数据处理需求,如果您熟悉 SQL,也可以直接通过 SQL 构建数据处理的 Pipeline,简单高效。目前 seatunnel 支持的Filter列表, 仍然在不断扩充中。您也可以开发自己的数据处理插件,整个系统是易于扩展的。

六、seatunnel 支持的插件

  • Input plugin

Fake, File, Hdfs, Kafka, S3, Socket, 自行开发的 Input plugin

  • Filter plugin

Add, Checksum, Convert, Date, Drop, Grok, Json, Kv, Lowercase, Remove, Rename, Repartition, Replace, Sample, Split, Sql, Table, Truncate, Uppercase, Uuid, 自行开发的Filter plugin

  • Output plugin

Elasticsearch, File, Hdfs, Jdbc, Kafka, Mysql, S3, Stdout, 自行开发的 Output plugin

七、环境依赖

  1. java 运行环境,java >= 8
  2. 如果您要在集群环境中运行 seatunnel,那么需要以下 Spark 集群环境的任意一种:
    Spark on Yarn
    Spark Standalone

如果您的数据量较小或者只是做功能验证,也可以仅使用 local 模式启动,无需集群环境,seatunnel 支持单机运行。 注: seatunnel 2.0 支持 Spark 和 Flink 上运行

八、安装与配置

  • 准备好JDK1.8
    seatunnel 依赖JDK1.8运行环境。
  • 准备好Spark
    seatunnel 依赖Spark,安装seatunnel前,需要先准备好Spark。 请先下载Spark, Spark版本请选择 >= 2.x.x。下载解压后,不需要做任何配置即可提交Spark deploy-mode = local模式的任务。 如果你期望任务运行在Standalone集群或者Yarn、Mesos集群上,请参考Spark官网配置文档。
  • 安装seatunnel
    下载seatunnel安装包并解压, 这里以社区版为例:
wget https://github.com/InterestingLab/seatunnel/releases/download/v<version>/seatunnel-<version>.zip -O seatunnel-<version>.zip
unzip seatunnel-<version>.zip
ln -s seatunnel-<version> seatunnel
  • 1
  • 2
  • 3
  • 配置
    config.conf 下述配置是从hive中抽数插入到clickhouse中的配置,数据源是hive的一张表,通过seatunnel插件根据id字段进行分片插入clickhouse集群不同分片。
spark {
  spark.sql.catalogImplementation = "hive"
  spark.app.name = "hive2clickhouse"
  spark.executor.instances = 30
  spark.executor.cores = 1 
  spark.executor.memory = "2g"
  spark.ui.port = 13000
}

input {
    hive {
		pre_sql = "select id,name,create_time from table"
		table_name = "table_tmp"
    }
}

filter {
	convert {
		source_field = "data_source"
		new_type = "UInt8"
	}

	org.interestinglab.waterdrop.filter.Slice {
		source_table_name = "table_tmp"
		source_field = "id"
		slice_num = 2
		slice_code = 0
		result_table_name = "table_8123"
	}
	org.interestinglab.waterdrop.filter.Slice {
		source_table_name = "table_tmp"
		source_field = "id"
		slice_num = 2
		slice_code = 1
		result_table_name = "table_8124"
	}
}

output {
    clickhouse {
        source_table_name="table_8123"
        host = "ip1:8123"
        database = "db_name"
        username="username"
        password="pwd"
        table = "model_score_local"
        fields = ["id","name","create_time"]
		    clickhouse.socket_timeout = 50000
		    retry_codes = [209, 210]
		    retry = 3
		    bulk_size = 500000
    }
	clickhouse {
        source_table_name="table_8124"
        host = "ip2:8123"
        database = "db_name"
        username="username"
        password="pwd"
        table = "model_score_local"
        fields = ["id","name","create_time"]
	    	clickhouse.socket_timeout = 50000
	    	retry_codes = [209, 210]
	    	retry = 3
		    bulk_size = 500000
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 启动
../bin/start-waterdrop.sh --master local[4] --deploy-mode client --config.conf
  • 1

九、生产应用案例

  • 微博, 增值业务部数据平台 微博某业务有数百个实时流式计算任务使用内部定制版 seatunnel,以及其子项目Guardian做seatunnel On Yarn 的任务监控。
  • 新浪, 大数据运维分析平台 新浪运维数据分析平台使用 seatunnel 为新浪新闻,CDN 等服务做运维大数据的实时和离线分析,并写入Clickhouse。
  • 搜狗,搜狗奇点系统 搜狗奇点系统使用 seatunnel 作为 ETL 工具, 帮助建立实时数仓体系
  • 趣头条,趣头条数据中心 趣头条数据中心,使用 seatunnel 支撑 mysql to hive 的离线 ETL 任务、实时 hive
    to clickhouse 的 backfill 技术支撑,很好的 cover 离线、实时大部分任务场景。
  • 一下科技, 一直播数据平台
  • 永辉超市子公司-永辉云创,会员电商数据分析平台 seatunnel 为永辉云创旗下新零售品牌永辉生活提供电商用户行为数据实时流式与离线
    SQL 计算。
  • 水滴筹, 数据平台 水滴筹在 Yarn 上使用 seatunnel 做实时流式以及定时的离线批处理,每天处理 3~4T
    的数据量,最终将数据写入 Clickhouse。

更多案例参见: https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v1/case_study/

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/530119
推荐阅读
  

闽ICP备14008679号