赞
踩
基于上述的不同,可以发现实现TDengine源节点,应该通过SourceFunction实现,而不应该使用Source来实现。
要理解Flink的Source,主要看它的方法,通过方法了解它的作用和定位。
接口:org.apache.flink.api.connector.source.Source
public interface Source<T, SplitT extends SourceSplit, EnumChkT> extends SourceReaderFactory<T, SplitT> { /** * 有界数据,还是持续数据流 */ Boundedness getBoundedness(); /** * 重点在Split,即源数据是可拆分,并发读取的。 */ SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext) throws Exception; /** * 重新装载数据分拆读取器 */ SplitEnumerator<SplitT, EnumChkT> restoreEnumerator( SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint) throws Exception; // ------------------------------------------------------------------------ // 序列化源数据 // ------------------------------------------------------------------------ /** * 客户端这边有分派到哪些任务的数据,需要存储时,凭此方法可以实现个性化的状态数据序列化和反序列化。 */ SimpleVersionedSerializer<SplitT> getSplitSerializer(); /** * 客户端这边有状态数据,需要存储时,凭此方法可以实现个性化的状态数据序列化和反序列化。 */ SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer(); }
从上述接口,可以总结出使用Source来实现源应该具备以下特征:
涛思数据订阅功能,分区和订阅分派是有涛思服务端管理的,各个分区的进度是在涛思服务端存储的,且客户端这边只能以主题级别发起订阅。所以客户端这边缺少任务的调节能力,也没有状态信息需要记录,使用RichParallelSourceFunction来实现是最合适的。
类:com.cimstech.sailboat.flink.run.source.taos.TDengineSourceFunction
import java.sql.SQLException; import java.time.Duration; import java.util.Collection; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.cimstech.xfront.common.collection.PropertiesEx; import com.cimstech.xfront.common.excep.ExceptionAssist; import com.taosdata.jdbc.tmq.ConsumerRecord; import com.taosdata.jdbc.tmq.ConsumerRecords; import com.taosdata.jdbc.tmq.TaosConsumer; public class TDengineSourceFunction<O> extends RichParallelSourceFunction<O> { private static final long serialVersionUID = 1L; private static final long POLL_TIMEOUT = 10_000L; static final Logger sLogger = LoggerFactory.getLogger(TDengineSourceFunction.class) ; TaosConsumer<O> mConsumer ; PropertiesEx mProps ; Collection<String> mTopics ; boolean mCanceled = false ; public TDengineSourceFunction() { } public TDengineSourceFunction(PropertiesEx aProps , Collection<String> aTopics) { mProps = aProps ; mTopics = aTopics ; } @Override public void open(Configuration aConf) throws Exception { RuntimeContext rc = getRuntimeContext() ; mProps.setProperty(TMQConstants.CLIENT_ID, mProps.getProperty(TMQConstants.CLIENT_ID) +"_"+rc.getIndexOfThisSubtask()) ; mConsumer = new TaosConsumer<>(mProps) ; mConsumer.subscribe(mTopics); } @Override public void run(SourceContext<O> aCtx) throws Exception { try { while(!mCanceled) { ConsumerRecords<O> rcds = mConsumer.poll(Duration.ofMillis(POLL_TIMEOUT)) ; if(!rcds.isEmpty()) { for(ConsumerRecord<O> rcd : rcds) { aCtx.collect(rcd.value()); } } } } finally { try { mConsumer.close() ; } catch (SQLException e) { sLogger.error(ExceptionAssist.getClearMessage(getClass(), e)) ; } } } @Override public void cancel() { mCanceled = true ; } }
类:com.cimstech.sailboat.flink.run.source.taos.RowDeserializer
import java.sql.ResultSet; import java.sql.SQLException; import java.util.LinkedHashMap; import java.util.Map; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.types.RowUtils; import com.cimstech.xfront.common.collection.CS; import com.cimstech.xfront.common.collection.PropertiesEx; import com.cimstech.xfront.common.log.Assert; import com.cimstech.xfront.common.reflect.XClassUtil; import com.taosdata.jdbc.TaosGlobalConfig; import com.taosdata.jdbc.tmq.Deserializer; import com.taosdata.jdbc.tmq.DeserializerException; import com.taosdata.jdbc.tmq.TMQConstants; public class RowDeserializer implements Deserializer<Row> { final LinkedHashMap<String, Integer> mFieldIndexMap = CS.linkedHashMap() ; String[] mOutFieldNames ; Class<?>[] mOutFieldClasses ; @Override public void configure(Map<?, ?> configs) { Object encodingValue = configs.get(TMQConstants.VALUE_DESERIALIZER_ENCODING) ; if (encodingValue instanceof String) TaosGlobalConfig.setCharset(((String) encodingValue).trim()); String outFieldsStr = (String)configs.get("outFields") ; mOutFieldNames = PropertiesEx.split(outFieldsStr) ; Assert.notEmpty(mOutFieldNames , "没有指定输出列(outFields)!") ; mOutFieldClasses = new Class[mOutFieldNames.length] ; for(int i=0 ; i<mOutFieldNames.length ; i++) { String[] segs = mOutFieldNames[i].split(":") ; mOutFieldNames[i] = segs[0] ; // 字段名 mOutFieldClasses[i] = XClassUtil.getClassOfCSN(segs[1]) ; mFieldIndexMap.put(mOutFieldNames[i] , i) ; } } @Override public Row deserialize(ResultSet data, String topic, String dbName) throws DeserializerException, SQLException { Row row = RowUtils.createRowWithNamedPositions(RowKind.INSERT , new Object[mOutFieldNames.length] , mFieldIndexMap) ; for(int i=0 ; i<mOutFieldNames.length ; i++) { row.setField(i, XClassUtil.typeAdapt(data.getObject(mOutFieldNames[i]), mOutFieldClasses[i])) ; } return row ; } }
类:com.cimstech.sailboat.flink.run.source.taos.SI_TDengine_Builder (部分代码)
... 省略 PropertiesEx props = new PropertiesEx() ; props.setProperty(TMQConstants.CONNECT_TYPE , "websocket") ; props.setProperty(TMQConstants.BOOTSTRAP_SERVERS , servAddr) ; props.setProperty(TMQConstants.CONNECT_USER , username) ; props.setProperty(TMQConstants.CONNECT_PASS , password) ; props.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true"); props.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000"); props.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8"); props.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true"); // props.setProperty(TMQConstants.CLIENT_ID, nodeId) ; props.setProperty(TMQConstants.GROUP_ID, UUID.randomUUID().toString()) ; String startingOffsets = execConfJo.optString("startingOffset") ; if(XString.isNotEmpty(startingOffsets)) { props.setProperty(TMQConstants.AUTO_OFFSET_RESET, startingOffsets.toLowerCase()) ; } // props.setProperty(TMQConstants.VALUE_DESERIALIZER , "com.cimstech.sailboat.flink.run.source.taos.RowDeserializer") ; JSONArray outRowFieldsJa = execConfJo.optJSONArray("outRowFields") ; Assert.notNull(outRowFieldsJa , "没有找到outRowFields!%s" , execConfJo); ERowTypeInfo rowTypeInfo = JSONKit.toRowTypeInfo(outRowFieldsJa) ; final int fieldAmount = outRowFieldsJa.length() ; Assert.isTrue(fieldAmount>0 , "没有找到outRowFields!%s" , execConfJo); StringBuilder outFieldsStrBld = new StringBuilder() ; outRowFieldsJa.forEachJSONObject(jo->{ if(outFieldsStrBld.length() > 0) outFieldsStrBld.append(',') ; outFieldsStrBld.append(jo.optString("name")) .append(':') .append(jo.optString("dataType")) ; }) ; props.setProperty("outFields" , outFieldsStrBld.toString()) ; ...省略 TDengineSourceFunction<Row> sourceFunc = new TDengineSourceFunction<Row>(props, Arrays.asList(topicsJa.toStringArray())) ; SingleOutputStreamOperator<Row> dss = env.addSource(sourceFunc , nodeName , rowTypeInfo) .assignTimestampsAndWatermarks(watermarkStrategy) // 2023-01-08 这一句是必需的,否则不会产生水位线 .name(nodeName) .uid(nodeId) ;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。