当前位置:   article > 正文

PiflowX组件-PostgresCdc

piflowx

PostgresCdc组件

组件说明

Postgres CDC连接器允许从PostgreSQL数据库读取快照数据和增量数据。

计算引擎

flink

组件分组

cdc

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
hostnameHostname“”PostgreSQL数据库服务器的IP地址或主机名。127.0.0.1
usernameUsername“”连接到PostgreSQL数据库服务器时要使用的用户名。root
passwordPassword“”连接PostgreSQL数据库服务器时使用的密码。123456
databaseNameDatabaseName“”要监视的PostgreSQL服务器的数据库名称。test
schemaNameSchema“”要监视的PostgreSQL数据库的Schema。public
tableNameTableName“”"需要监视的PostgreSQL数据库的表名。test
portPort5432PostgreSQL数据库服务器的整数端口号。5432
slotNameSlotName“”The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring.
Slot names must conform to PostgreSQL replication slot naming rules, which state: “Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character.”
tableDefinitionTableDefinition“”Flink table定义。
propertiesPROPERTIES“”连接器其他配置。

PostgresCdc示例配置

{
	"flow":{
		"name":"pgcdc",
		"uuid":"273553c2ece043c29fba179df6826c5a",
		"paths":[
			{
				"inport":"",
				"from":"products",
				"to":"SQLQuery",
				"outport":""
			},
			{
				"inport":"",
				"from":"orders",
				"to":"SQLQuery",
				"outport":""
			},
			{
				"inport":"",
				"from":"shipments",
				"to":"SQLQuery",
				"outport":""
			},
			{
				"inport":"",
				"from":"SQLQuery",
				"to":"ShowChangeLogData",
				"outport":""
			}
		],
		"engineType":"flink",
		"stops":[
			{
				"name":"shipments",
				"bundle":"cn.piflow.bundle.flink.cdc.postgres.PostgresCdc",
				"uuid":"13ac002a8a4d404e8dda77c9a1d8360f",
				"properties":{
					"hostname":"192.168.186.102",
					"username":"postgres",
					"port":"5432",
					"properties":{
						
					},
					"schemaName":"public",
					"tableName":"shipments",
					"tableDefinition":{
						"tableBaseInfo":{
							"ifNotExists":true,
							"registerTableName":"shipments"
						},
						"physicalColumnDefinition":[
							{
								"columnName":"shipment_id",
								"columnType":"INT",
								"primaryKey":true
							},
							{
								"columnName":"order_id",
								"columnType":"INT"
							},
							{
								"columnName":"origin",
								"columnType":"STRING"
							},
							{
								"columnName":"destination",
								"columnType":"STRING"
							},
							{
								"columnName":"is_arrived",
								"columnType":"BOOLEAN"
							}
						],
						"asSelectStatement":{
							
						},
						"likeStatement":{
							
						}
					},
					"databaseName":"postgres",
					"slotName":"flink",
					"password":"postgres"
				},
				"customizedProperties":{
					
				}
			},
			{
				"name":"SQLQuery",
				"bundle":"cn.piflow.bundle.flink.common.SQLQuery",
				"uuid":"a5afa1368e1e4e348950bb2eda2011a8",
				"properties":{
					"viewName":"temp",
					"sql":"SELECT\n  o.*,\n  p.name,\n  p.description,\n  s.shipment_id,\n  s.origin,\n  s.destination,\n  s.is_arrived\nFROM\n  orders AS o\n  LEFT JOIN products AS p ON o.product_id = p.id\n  LEFT JOIN shipments AS s ON o.order_id = s.order_id"
				},
				"customizedProperties":{
					
				}
			},
			{
				"name":"ShowChangeLogData",
				"bundle":"cn.piflow.bundle.flink.common.ShowChangeLogData",
				"uuid":"c6604e600aa645d29264f4be5ce4e2eb",
				"properties":{
					"showNumber":"10"
				},
				"customizedProperties":{
					
				}
			},
			{
				"name":"products",
				"bundle":"cn.piflow.bundle.flink.cdc.mysql.MysqlCdc",
				"uuid":"ab91c42f26cb4e119e34830178611293",
				"properties":{
					"hostname":"192.168.186.102",
					"username":"root",
					"serverId":"5400",
					"port":"3306",
					"properties":{
						"server-time-zone":"UTC"
					},
					"tableName":"products",
					"tableDefinition":{
						"tableBaseInfo":{
							"registerTableName":"products"
						},
						"physicalColumnDefinition":[
							{
								"columnName":"id",
								"columnType":"INT",
								"primaryKey":true
							},
							{
								"columnName":"name",
								"columnType":"STRING"
							},
							{
								"columnName":"description",
								"columnType":"STRING"
							}
						],
						"asSelectStatement":{
							
						},
						"likeStatement":{
							
						}
					},
					"databaseName":"mydb",
					"password":"123456"
				},
				"customizedProperties":{
					
				}
			},
			{
				"name":"orders",
				"bundle":"cn.piflow.bundle.flink.cdc.mysql.MysqlCdc",
				"uuid":"4ffa58f3db6144739f6b797fd3839025",
				"properties":{
					"hostname":"192.168.186.102",
					"username":"root",
					"serverId":"5410",
					"port":"3306",
					"properties":{
						"scan.incremental.snapshot.chunk.key-column":"order_id",
						"server-time-zone":"UTC"
					},
					"tableName":"orders",
					"tableDefinition":{
						"tableBaseInfo":{
							"ifNotExists":true,
							"registerTableName":"orders"
						},
						"physicalColumnDefinition":[
							{
								"columnName":"order_id",
								"columnType":"INT"
							},
							{
								"columnName":"order_date",
								"columnType":"TIMESTAMP",
								"length":0
							},
							{
								"columnName":"customer_name",
								"columnType":"STRING"
							},
							{
								"columnName":"price",
								"columnType":"DECIMAL",
								"precision":10,
								"scale":5
							},
							{
								"columnName":"product_id",
								"columnType":"INT"
							},
							{
								"columnName":"order_status",
								"columnType":"BOOLEAN"
							}
						],
						"asSelectStatement":{
							
						},
						"likeStatement":{
							
						}
					},
					"databaseName":"mydb",
					"password":"123456"
				},
				"customizedProperties":{
					
				}
			}
		]
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
示例说明

本示例演示了基于PiflowX构建MySQL和Postgres的Streaming ETL。

演示DEMO

基于 PiflowX构建 MySQL 和 Postgres 的 Streaming ETL

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/415788
推荐阅读
相关标签
  

闽ICP备14008679号