赞
踩
其实原理跟从网上下载的 mysql连接器一样,打包编译,添加好pom文件,sql解析时,会根据程序中配置的connector 来做判断是那种解析器,然后与pom中引入的解析器做匹配。
那么具体要如何开发引入一个connector呢?
简单来说需要三个东西
sink类实例 : KuduDynamicTableSink
工厂类: KuduDynamicTableFactory
以及一个配置文件:org.apache.flink.table.factories.Factory
其实主要利用了java的SPI原理,用户需要在工程的resources中新建一个文件
这里放什么呢,放的是 用户开发的 工厂类的地址
com.datacenter.connectors.kudu.table.KuduDynamicTableFactory
原因就是,SPI会从这个文件中找到工厂类,然后由工厂类来构造出sink实例供sql解析出的对象使用
KuduDynamicTableSink:
package com.datacenter.streaming.sql.connectors.kudu.table;
import com.datacenter.streaming.sql.connectors.kudu.KuduOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduOutputFormat;
import com.datacenter.streaming.sql.connectors.kudu.KuduSinkOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.types.RowKind;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkState;
/**
* KuduDynamicTableSink
* @author loveyou
*/
public class KuduDynamicTableSink implements DynamicTableSink {
private final KuduOptions kuduOptions;
private final KuduSinkOptions kuduSinkOptions;
private TableSchema physicalSchema;
private int bufferFlushInterval;
private int maxRetries;
private List<String> keyFields;
public KuduDynamicTableSink(KuduOptions kuduOptions, KuduSinkOptions kuduSinkOptions, TableSchema physicalSchema) {
this.kuduOptions = kuduOptions;
this.kuduSinkOptions = kuduSinkOptions;
this.physicalSchema = physicalSchema;
UniqueConstraint uniqueConstraint = physicalSchema.getPrimaryKey().orElse(null);
if (uniqueConstraint != null) {
this.keyFields = uniqueConstraint.getColumns();
}
this.bufferFlushInterval = (int) kuduSinkOptions.getBatchIntervalMs();
this.maxRetries = kuduSinkOptions.getMaxRetries();
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
validatePrimaryKey(requestedMode);
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_AFTER)
.build();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
KuduOutputFormat kuduOutputFormat = new KuduOutputFormat(
kuduOptions.getMaster(),
kuduOptions.getTable(),
physicalSchema.getFieldNames(),
physicalSchema.getFieldDataTypes(),
bufferFlushInterval,
maxRetries
);
return OutputFormatProvider.of(kuduOutputFormat);
}
@Override
public DynamicTableSink copy() {
return new KuduDynamicTableSink(
kuduOptions,
kuduSinkOptions,
physicalSchema);
}
@Override
public String asSummaryString() {
return null;
}
private void validatePrimaryKey(ChangelogMode requestedMode) {
checkState(ChangelogMode.insertOnly().equals(requestedMode) || keyFields == null,
"please declare primary key for sink table when query contains update/delete record.");
}
}
package com.datacenter.streaming.sql.connectors.kudu.table;
import com.datacenter.streaming.sql.connectors.kudu.KuduLookupOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduOptions;
import com.datacenter.streaming.sql.connectors.kudu.KuduSinkOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
/**
* KuduDynamicTableFactory
* @author loveyou
*/
public class KuduDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
// common options
public static final String IDENTIFIER = "kudu";
public static final ConfigOption<String> MASTER = ConfigOptions
.key("master")
.stringType()
.noDefaultValue()
.withDescription("the kudu master address.");
public static final ConfigOption<String> TABLE = ConfigOptions
.key("table")
.stringType()
.noDefaultValue()
.withDescription("the jdbc table name.");
public static final ConfigOption<String> USERNAME = ConfigOptions
.key("username")
.stringType()
.noDefaultValue()
.withDescription("the jdbc user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions
.key("password")
.stringType()
.noDefaultValue()
.withDescription("the jdbc password.");
// lookup options
private static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions
.key("lookup.cache.max-rows")
.longType()
.defaultValue(-1L)
.withDescription("the max number of rows of lookup cache, over this value, the oldest rows will " +
"be eliminated. \"cache.max-rows\" and \"cache.ttl\" options must all be specified if any of them is " +
"specified. Cache is not enabled as default.");
private static final ConfigOption<Duration> LOOKUP_CACHE_TTL = ConfigOptions
.key("lookup.cache.ttl")
.durationType()
.defaultValue(Duration.ofSeconds(-1))
.withDescription("the cache time to live.");
private static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions
.key("lookup.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if lookup database failed.");
// write options
//private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
// .key("sink.buffer-flush.max-rows")
// .intType()
// .defaultValue(100)
// .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
// " of records, will flush data. The default value is 100.");
private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
.key("sink.buffer-flush.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
"default value is 1s.");
private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
.key("sink.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if writing records to database failed.");
/**
* DynamicTableSource 实例
*
* @param context
* @return
*/
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
helper.validate();
validateConfigOptions(config);
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new KuduDynamicTableSource(
getKuduOptions(helper.getOptions()),
getKuduLookupOptions(helper.getOptions()),
physicalSchema);
}
/**
* DynamicTableSink 实例
*
* @param context
* @return
*/
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
helper.validate();
validateConfigOptions(config);
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new KuduDynamicTableSink(
getKuduOptions(config),
getKuduSinkOptions(config),
physicalSchema);
}
private KuduOptions getKuduOptions(ReadableConfig readableConfig) {
KuduOptions.KuduOptionsBuilder builder = KuduOptions.builder()
.master(readableConfig.get(MASTER))
.table(readableConfig.get(TABLE));
readableConfig.getOptional(USERNAME).ifPresent(builder::username);
readableConfig.getOptional(PASSWORD).ifPresent(builder::password);
return builder.build();
}
private KuduLookupOptions getKuduLookupOptions(ReadableConfig readableConfig) {
KuduLookupOptions.KuduLookupOptionsBuilder builder = KuduLookupOptions.builder();
builder.cacheMaxSize(readableConfig.get(LOOKUP_CACHE_MAX_ROWS));
builder.cacheExpireMs(readableConfig.get(LOOKUP_CACHE_TTL).toMillis());
builder.maxRetryTimes(readableConfig.get(LOOKUP_MAX_RETRIES));
return builder.build();
}
private KuduSinkOptions getKuduSinkOptions(ReadableConfig config) {
KuduSinkOptions.KuduSinkOptionsBuilder builder = KuduSinkOptions.builder();
//builder.batchSize(config.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.batchIntervalMs(config.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.maxRetries(config.get(SINK_MAX_RETRIES));
return builder.build();
}
/**
* 工厂唯一标识符
*
* @return
*/
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
/**
* 必选项
*
* @return
*/
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(MASTER);
requiredOptions.add(TABLE);
return requiredOptions;
}
/**
* 可选项
*
* @return
*/
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> optionalOptions = new HashSet<>();
optionalOptions.add(USERNAME);
optionalOptions.add(PASSWORD);
optionalOptions.add(LOOKUP_CACHE_MAX_ROWS);
optionalOptions.add(LOOKUP_CACHE_TTL);
optionalOptions.add(LOOKUP_MAX_RETRIES);
//optionalOptions.add(SINK_BUFFER_FLUSH_MAX_ROWS);
optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
optionalOptions.add(SINK_MAX_RETRIES);
return optionalOptions;
}
/**
* 验证配置
*
* @param config
*/
private void validateConfigOptions(ReadableConfig config) {
checkAllOrNone(config, new ConfigOption[]{
USERNAME,
PASSWORD
});
checkAllOrNone(config, new ConfigOption[]{
LOOKUP_CACHE_MAX_ROWS,
LOOKUP_CACHE_TTL
});
Preconditions.checkArgument(
config.get(SINK_BUFFER_FLUSH_INTERVAL).compareTo(Duration.ofSeconds(1)) >= 0,
SINK_BUFFER_FLUSH_INTERVAL.key() + " must >= 1000"
);
}
/**
* 要么一个都没有,要么都要有
*
* @param config
* @param configOptions
*/
private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions) {
int presentCount = 0;
for (ConfigOption configOption : configOptions) {
if (config.getOptional(configOption).isPresent()) {
presentCount++;
}
}
String[] propertyNames = Arrays.stream(configOptions).map(ConfigOption::key).toArray(String[]::new);
Preconditions.checkArgument(configOptions.length == presentCount || presentCount == 0,
"Either all or none of the following options should be provided:\n" + String.join("\n", propertyNames));
}
}
调用工厂类,由工厂类实现的DynamicTableSinkFactory接口汇总的create方法,来返回一个KuduDynamicTableSink 方法实例。
完成后 可以把这个代码片段拷贝进你的java项目中 ,在 pom中 添加
<modules>
<module>connector-kudu</module>
</modules>
或者直接打包成jar 用 idea 自带的 package 直接 打包
,将jar拷贝进你的本地仓库,像引入mysql connector一样的方式来引入文件
所有代码都在我的git上,需要的同学可以自取,如果找不到可以私信我
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。