赞
踩
最近一直在研究flinkcdc实时数据同步,测试了Oracle和SqlServer数据源,发现解析出来的数据,其中的日期字段是时间戳格式(如1676620352),而我想要的是日期格式(2023-02-17),找人很多资料也尝试了很多配置文件都没有找到解决办法,最后只好自己重写序列化方法。代码如下,有更好方法的可以交流下~
import com.alibaba.fastjson.serializer.SerializerFeature; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; public class MyJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> { private transient JsonConverter jsonConverter; public MyJsonDebeziumDeserializationSchema() { } @Override public void deserialize(SourceRecord record, Collector<String> out) { JSONObject json = new JSONObject(); Struct valueStruct = (Struct) record.value(); Struct afterStruct = valueStruct.getStruct("after"); Struct beforeStruct = valueStruct.getStruct("before"); Struct sourceStruct = valueStruct.getStruct("source"); String op = valueStruct.getString("op"); Long ts_ms = valueStruct.getInt64("ts_ms"); if(afterStruct==null){ json.put("after", null); }else{ JSONObject after = change(afterStruct); json.put("after", after); } if(beforeStruct==null){ json.put("before", null); }else{ JSONObject before = change(beforeStruct); json.put("before", before); } JSONObject source = change(sourceStruct); json.put("source", source); json.put("op", op); json.put("ts_ms", ts_ms); out.collect(addJson(json)); } private static JSONObject change(Struct struct) { List<Field> fields = struct.schema().fields(); JSONObject json = new JSONObject(); for (Field field : fields) { // System.out.println(field.schema().name()); if ("io.debezium.time.Timestamp".equals(field.schema().name())) { Object value = struct.get(field.name()); if (value != null) { String s1 = GetDate.parseDate(value.toString()); json.put(field.name(), s1); }else { json.put(field.name(), null); } }else if ("io.debezium.time.MicroTimestamp".equals(field.schema().name())){ Object value = struct.get(field.name()); if (value != null) { String s1 = GetDate.parseDate2(value.toString()); json.put(field.name(), s1); }else { json.put(field.name(), null); } } else { Object value = struct.get(field.name()); if (value == null) { json.put(field.name(), null); } else { json.put(field.name(), value.toString()); } } } return json; } public String addJson(JSONObject json) { String s = JSONObject.toJSONString(json, SerializerFeature.WriteMapNullValue); return s; } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.of(String.class); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。