赞
踩
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
调用MysqlSource方法
//创建flinkcdc监控Mysqlblinlog日志 DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder() //数据库地址 .hostname("localhost") //端口号 .port(3306) //用户名 .username("zhang") //密码 .password("1234") //监控的数据库 .databaseList("school") //监控的表名,格式数据库.表名 .tableList("school.person") //虚拟化方式 .deserializer(new PersonSchema()) //时区 .serverTimeZone("UTC") .build(); //创建flink流处理执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 environment.setParallelism(1); //加载source DataStream<String> source = environment.addSource(sourceFunction); //打印结果 source.print(); //执行任务 environment.execute();
目前最高支持8.0的mysql数据库,8.0以上可能会因为无法获取密钥报错。
blinlog日志序列化成对象字符串
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; //实现接口DebeziumDeserializationSchema把日志进行解析 public class PersonSchema implements DebeziumDeserializationSchema<String> { @Override public void deserialize(SourceRecord source, Collector<String> out) throws Exception { String topic = source.topic(); String[] split = topic.split("[.]"); String database = split[1]; String table = split[2]; //获取操作类型 Envelope.Operation operation = Envelope.operationFor(source); //获取数据本身 Struct struct = (Struct) source.value(); //根据每个字段名获取相应的字段类型 Struct after = struct.getStruct("after"); Object pid = after.get("pid"); Object pname = after.get("pname"); Object psex = after.get("psex"); Object page = after.get("page"); //拼接成字符串(添加数据) String file=pid.toString()+"|"+pname.toString()+"|"+psex.toString()+"|"+page.toString(); Struct before = struct.getStruct("before"); Object upid = before.get("pid"); Object upname = before.get("pname"); Object upsex = before.get("psex"); Object upage = before.get("page"); //修改数据的拼接 String ufile=pid.toString()+"|"+pname.toString()+"|"+psex.toString()+"|"+page.toString(); if (after!=null&&before==null){ String datam="insert"+"|"+file; out.collect(datam); }else { String datan="update"+"|"+ufile; out.collect(datan); } } @Override public TypeInformation<String> getProducedType() { //返回字符串类型 return BasicTypeInfo.STRING_TYPE_INFO; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。