当前位置:   article > 正文

Flink 1.11.1:flink CDC Debezium自定义修改debezium-json格式_corrupt debezium json message

corrupt debezium json message

前言

Flink 1.11新增支持CDC,包括Debezium、Canal,现修改debezium-json的format格式

默认输出格式

1、插入

(true,1,2,3)
  • 1

2、更新

(false,1,2,3)
(true,1,2,4
  • 1
  • 2

3、删除

(false,1,2,3)
(true,1,2,4
  • 1
  • 2

希望修改后的输出格式

1、插入

(true,1,2,3,'c')
  • 1

备注:‘c’代表新建

2、更新

(true,1,2,4,'u')
  • 1

备注:'u’代表更新

3、删除
备注:不做任何处理

修改代码

flink-release-1.11.1/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
  • 1
	@Override
	public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
		try {
			GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
			GenericRowData payload;
			if (schemaInclude) {
				payload = (GenericRowData) row.getField(0);
			} else {
				payload = row;
			}

			GenericRowData before = (GenericRowData) payload.getField(0);
			GenericRowData after = (GenericRowData) payload.getField(1);
            int len = after.getArity();
            after.setField(len-1,payload.getField(2));
			String op = payload.getField(2).toString();
			if (OP_CREATE.equals(op) || OP_READ.equals(op)) {
				//after.setRowKind(RowKind.INSERT);
                after.setField(len-1,payload.getField(2));
				out.collect(after);
			} else if (OP_UPDATE.equals(op)) {
				//before.setRowKind(RowKind.UPDATE_BEFORE);
				//after.setRowKind(RowKind.UPDATE_AFTER);
				//out.collect(before);
				out.collect(after);
			} else if (OP_DELETE.equals(op)) {
				//before.setRowKind(RowKind.DELETE);
                before.setField(len-1,payload.getField(2));
				out.collect(before);
			} else {
				if (!ignoreParseErrors) {
					throw new IOException(format(
						"Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'", op, new String(message)));
				}
			}
		} catch (Throwable t) {
			// a big try catch to protect the processing.
			if (!ignoreParseErrors) {
				throw new IOException(format(
					"Corrupt Debezium JSON message '%s'.", new String(message)), t);
			}
		}
	}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/477779
推荐阅读
相关标签
  

闽ICP备14008679号