当前位置:   article > 正文

使用DataX,从Greenplum将数据传输到Hive分区表中_datax greenplum

datax greenplum

我司使用Greenplum作为计算库,实时计算统计数据,但是数据量大了之后影响计算速度。所以将每天的数据通过Datax传输到Hive的按日分区的分区表中,用于备份,其他数据放在Greenplum中作为实时数据计算。

Greenplum内核还是PostgreSQL,所以Datax配置还是使用PostgreSQL的连接。

数据表准备

hive建表,dt作为分区字段,比如dt='20230619'

create table test(
    id bigint comment '主键',
    name string comment '姓名',
    address string comment '地址',
    update_time timestamp comment '日期' 
) partitioned by (dt string)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Greenplum就是常规建表。

CREATE TABLE "public"."test" (
  "id" int8 NOT NULL,
  "name" varchar(255) COLLATE "pg_catalog"."default",
  "address" varchar(255) COLLATE "pg_catalog"."default",
  "update_time" timestamp(6),
  PRIMARY KEY ("id")
)
;

ALTER TABLE "public"."test" 
  OWNER TO "postgres";

COMMENT ON COLUMN "public"."test"."id" IS '主键';

COMMENT ON COLUMN "public"."test"."name" IS '姓名';

COMMENT ON COLUMN "public"."test"."address" IS '地址';

COMMENT ON COLUMN "public"."test"."update_time" IS '日期';
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

配置datax的job

{
  "job":{
    "content":[
      {
        "reader":{
			"name":"postgresqlreader",
			"parameter":{
				"column":[
							"id",             
                            "name",       
							"address",
							"update_time"
				],
				"where":
					"date(update_time) = to_date(${date}::text,'yyyyMMdd')",
				"connection":[
					{
						"jdbcUrl":[
						"jdbc:postgresql://localhost:5432/test"
						],
						"table":["public.test"]
					}
				],
				"password":"postgres",
				"username":"postgres"
			}
        },
        "writer":{
			"name":"hdfswriter",
			"parameter":{
				"column":[
					 {
						"name": "id",
						"type": "INT"
					 },
					 {
						"name": "name",
						"type": "STRING"
					 },
					 {
						"name": "address",
						"type": "STRING"
					 },
					 {
						"name": "update_time",
						"type": "TIMESTAMP"
					 }
				],
				"defaultFS": "hdfs://localhost:8020/",
				"fieldDelimiter": "\1",
				"fileName": "test_20230619",
				"fileType": "orc",
				"haveKerberos": false,
				"path": "/warehouse/tablespace/managed/hive/test/test/dt=${date}",
				"writeMode": "truncate"
			}
		}
      }
    ],
    "setting":{
      "speed":{
        "channel":"5"
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

因为datax不支持动态分区字段,所以我把分区字段传到job中进行传输。

启动脚本

#!/bin/bash
# 1.获取日期参数
if [ $# -eq 0 ]
then
  echo "请添加参数1:tablename(必填) 2:date格式:20220101(非必填)"
  exit 1
elif [ $# -eq 1 ]
then
  date=$(date -d "-1 day" +%Y%m%d)
else
  date=$2
fi
tablename=$1
# 2. 创建Hive分区表
su - hdfs -c "hadoop fs -mkdir -p /warehouse/tablespace/managed/hive/test.db/${tablename}/dt=${date}"
# 3. 执行datax脚本
su - hdfs -c "python /opt/datax/bin/datax.py /opt/datax/job/${tablename}_gp2hive.json -p"-Ddate=${date}" > /opt/datax/log/"${tablename}".log"
# 4. 修复Hive分区表
su - hive -c "hive -e 'msck repair table test."${tablename}"'"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

批量启动脚本

dir=$(ls /opt/datax/job/*.*)
for file in ${dir}
do
  filename=$(basename ${file})
  tablename=$(echo ${filename} | sed 's/_gp2hive.json//')
  sh /opt/datax/job/single_job.sh ${tablename}
done
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

定时启动

#加入定时启动
> crontab -e
0 3 * * * /opt/datax/job/batch_job.sh
# 查看是否存在定时任务
> crontab -l
  • 1
  • 2
  • 3
  • 4
  • 5

注意
需要将batch_job.sh添加可执行权限,不然不能运行。

chmod +x batch_job.sh
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/141144
推荐阅读
相关标签
  

闽ICP备14008679号