当前位置:   article > 正文

涛思(TDengine)类型的Flink Source实现及与Kafka的对比分析_flink tdengine

flink tdengine

1. 涛思(TDengine v3.1)和Kafka订阅的不同之处

  1. 涛思不支持指定分区订阅,只能订阅一个或多个主题。涛思中的vGroup对应Kafka中的Partition概念。对于涛思中的一个消费者组,哪些分区由哪个消费者消费已经平衡再分配由服务端决定。
  2. TaosConsumer没有主动的暂停和恢复订阅的能力。
  3. TaosConsumer没有提供数据抓取和消费相关的度量数据接口。

基于上述的不同,可以发现实现TDengine源节点,应该通过SourceFunction实现,而不应该使用Source来实现。

2. Source理解

要理解Flink的Source,主要看它的方法,通过方法了解它的作用和定位。

接口:org.apache.flink.api.connector.source.Source

public interface Source<T, SplitT extends SourceSplit, EnumChkT>
        extends SourceReaderFactory<T, SplitT> {

    /**
     * 有界数据,还是持续数据流
     */
    Boundedness getBoundedness();

    /**
     * 重点在Split,即源数据是可拆分,并发读取的。
     */
    SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext)
            throws Exception;

    /**
     * 重新装载数据分拆读取器
     */
    SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(
            SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint) throws Exception;

    // ------------------------------------------------------------------------
    //  序列化源数据
    // ------------------------------------------------------------------------

    /**
     * 客户端这边有分派到哪些任务的数据,需要存储时,凭此方法可以实现个性化的状态数据序列化和反序列化。
     */
    SimpleVersionedSerializer<SplitT> getSplitSerializer();

    /**
     * 客户端这边有状态数据,需要存储时,凭此方法可以实现个性化的状态数据序列化和反序列化。
     */
    SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

从上述接口,可以总结出使用Source来实现源应该具备以下特征:

  1. 源是可拆分且可并发读取的。
  2. 拆分后的数据读取任务是客户端这边可控的,并且可以伴有任务分派和状态信息以支持任务的重分派。

涛思数据订阅功能,分区和订阅分派是有涛思服务端管理的,各个分区的进度是在涛思服务端存储的,且客户端这边只能以主题级别发起订阅。所以客户端这边缺少任务的调节能力,也没有状态信息需要记录,使用RichParallelSourceFunction来实现是最合适的。

3. TDengine源节点在XSailboat中的实现

类:com.cimstech.sailboat.flink.run.source.taos.TDengineSourceFunction

import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cimstech.xfront.common.collection.PropertiesEx;
import com.cimstech.xfront.common.excep.ExceptionAssist;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.TaosConsumer;

public class TDengineSourceFunction<O> extends RichParallelSourceFunction<O>
{

	private static final long serialVersionUID = 1L;
	
	private static final long POLL_TIMEOUT = 10_000L;
	
	static final Logger sLogger = LoggerFactory.getLogger(TDengineSourceFunction.class) ;
	
	TaosConsumer<O> mConsumer ;
	
	PropertiesEx mProps ;
	Collection<String> mTopics ;
	
	boolean mCanceled = false ;
	
	public TDengineSourceFunction()
	{
	}
	
	public TDengineSourceFunction(PropertiesEx aProps , Collection<String> aTopics)
	{
		mProps = aProps ;
		mTopics = aTopics ;
	}
	
	@Override
	public void open(Configuration aConf) throws Exception
	{
		RuntimeContext rc = getRuntimeContext() ;
		mProps.setProperty(TMQConstants.CLIENT_ID, mProps.getProperty(TMQConstants.CLIENT_ID) +"_"+rc.getIndexOfThisSubtask()) ;
		mConsumer = new TaosConsumer<>(mProps) ;
		mConsumer.subscribe(mTopics);
	}

	@Override
	public void run(SourceContext<O> aCtx) throws Exception
	{
		try
		{
			while(!mCanceled)
			{
				ConsumerRecords<O> rcds = mConsumer.poll(Duration.ofMillis(POLL_TIMEOUT)) ;
				if(!rcds.isEmpty())
				{
					for(ConsumerRecord<O> rcd : rcds)
					{
						aCtx.collect(rcd.value());
					}
				}
			}
		}
		finally
		{
			try
			{
				mConsumer.close() ;
			}
			catch (SQLException e)
			{
				sLogger.error(ExceptionAssist.getClearMessage(getClass(), e)) ;
			}
		}
	}

	@Override
	public void cancel()
	{
		mCanceled = true ;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

类:com.cimstech.sailboat.flink.run.source.taos.RowDeserializer

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.types.RowUtils;

import com.cimstech.xfront.common.collection.CS;
import com.cimstech.xfront.common.collection.PropertiesEx;
import com.cimstech.xfront.common.log.Assert;
import com.cimstech.xfront.common.reflect.XClassUtil;
import com.taosdata.jdbc.TaosGlobalConfig;
import com.taosdata.jdbc.tmq.Deserializer;
import com.taosdata.jdbc.tmq.DeserializerException;
import com.taosdata.jdbc.tmq.TMQConstants;

public class RowDeserializer implements Deserializer<Row>
{    
    final LinkedHashMap<String, Integer> mFieldIndexMap = CS.linkedHashMap() ;
    String[] mOutFieldNames ;
    Class<?>[] mOutFieldClasses ;

    @Override
    public void configure(Map<?, ?> configs)
    {
        Object encodingValue = configs.get(TMQConstants.VALUE_DESERIALIZER_ENCODING) ;
        if (encodingValue instanceof String)
            TaosGlobalConfig.setCharset(((String) encodingValue).trim());
        
        String outFieldsStr = (String)configs.get("outFields") ;
        mOutFieldNames = PropertiesEx.split(outFieldsStr) ;
        Assert.notEmpty(mOutFieldNames , "没有指定输出列(outFields)!") ;
        
        mOutFieldClasses = new Class[mOutFieldNames.length] ;
        for(int i=0 ; i<mOutFieldNames.length ; i++)
        {
        	String[] segs = mOutFieldNames[i].split(":") ;
        	mOutFieldNames[i] = segs[0] ;		// 字段名
        	mOutFieldClasses[i] = XClassUtil.getClassOfCSN(segs[1]) ;
        	mFieldIndexMap.put(mOutFieldNames[i] , i) ;
        }
    }

    @Override
    public Row deserialize(ResultSet data, String topic, String dbName) throws DeserializerException, SQLException
    {
    	Row row = RowUtils.createRowWithNamedPositions(RowKind.INSERT , new Object[mOutFieldNames.length]
				, mFieldIndexMap) ;
    	for(int i=0 ; i<mOutFieldNames.length ; i++)
    	{
    		row.setField(i, XClassUtil.typeAdapt(data.getObject(mOutFieldNames[i]), mOutFieldClasses[i])) ;
    	}
        return row ;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

类:com.cimstech.sailboat.flink.run.source.taos.SI_TDengine_Builder (部分代码)

	... 省略
	PropertiesEx props = new PropertiesEx() ;
	props.setProperty(TMQConstants.CONNECT_TYPE , "websocket") ;
	props.setProperty(TMQConstants.BOOTSTRAP_SERVERS , servAddr) ;
	props.setProperty(TMQConstants.CONNECT_USER , username) ;
	props.setProperty(TMQConstants.CONNECT_PASS , password) ;
    props.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
    props.setProperty(TMQConstants.AUTO_COMMIT_INTERVAL, "1000");
    props.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
    props.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true");
       
    //
    props.setProperty(TMQConstants.CLIENT_ID, nodeId) ;
    props.setProperty(TMQConstants.GROUP_ID, UUID.randomUUID().toString()) ;
	
	String startingOffsets = execConfJo.optString("startingOffset") ;
	if(XString.isNotEmpty(startingOffsets))
	{
		props.setProperty(TMQConstants.AUTO_OFFSET_RESET, startingOffsets.toLowerCase()) ;
	}
	
	//
	props.setProperty(TMQConstants.VALUE_DESERIALIZER , "com.cimstech.sailboat.flink.run.source.taos.RowDeserializer") ;
	
	JSONArray outRowFieldsJa = execConfJo.optJSONArray("outRowFields") ;
	Assert.notNull(outRowFieldsJa , "没有找到outRowFields!%s" , execConfJo);
	ERowTypeInfo rowTypeInfo = JSONKit.toRowTypeInfo(outRowFieldsJa) ;
	final int fieldAmount = outRowFieldsJa.length() ;
	Assert.isTrue(fieldAmount>0 , "没有找到outRowFields!%s" , execConfJo);
	StringBuilder outFieldsStrBld = new StringBuilder() ;
	outRowFieldsJa.forEachJSONObject(jo->{
		if(outFieldsStrBld.length() > 0)
			outFieldsStrBld.append(',') ;
		 outFieldsStrBld.append(jo.optString("name"))
		 	.append(':')
		 	.append(jo.optString("dataType")) ;
	}) ;
	
	props.setProperty("outFields" , outFieldsStrBld.toString()) ;
	...省略
	TDengineSourceFunction<Row> sourceFunc = new TDengineSourceFunction<Row>(props, Arrays.asList(topicsJa.toStringArray())) ;
	SingleOutputStreamOperator<Row> dss = env.addSource(sourceFunc , nodeName , rowTypeInfo)
				.assignTimestampsAndWatermarks(watermarkStrategy)		// 2023-01-08 这一句是必需的,否则不会产生水位线
				.name(nodeName)
				.uid(nodeId)
				;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/745466
推荐阅读
相关标签
  

闽ICP备14008679号