赞
踩
<OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function)
SouceFunction
/** * 定义一个每秒输出当前毫秒数的source */ public class TimedSource implements SourceFunction<Long>{ private boolean running = true; @Override public void run(SourceContext<Long> sourceContext) throws Exception { while (running) { long ts = System.currentTimeMillis(); ts = ts - ts % 1000; sourceContext.collect(ts); Thread.sleep(1000 - ts % 1000); } } @Override public void cancel() { running = false; } }
使用 addSource 引入TimedSource
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> streamSource = environment.addSource(new TimedSource());
<OUT> DataStreamSource<OUT> fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName)
Source
,指定WatermarkStrategyflink-connector-kafka
,
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
flink-connector-pulsar
PulsarSource<String> source = PulsarSource.builder()
.setServiceUrl(serviceUrl)
.setStartCursor(StartCursor.earliest())
.setTopics("my-topic")
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName("my-subscription")
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。