当前位置:   article > 正文

终于有人将Flink里的addSource,fromSource 说清楚了_flink env.fromsource env.addsource

flink env.fromsource env.addsource

addSource 与 fromSource

addSource

<OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) 
  • 1

1. 实现SouceFunction

2. 对于checkPoint ,savePoint 等情况需要自己实现

3. 常用于自定义的数据源

/**
 * 定义一个每秒输出当前毫秒数的source
 */
public class TimedSource implements SourceFunction<Long>{
    private boolean running = true;
    @Override
    public void run(SourceContext<Long> sourceContext) throws Exception {
        while (running) {
            long ts = System.currentTimeMillis();
            ts = ts - ts % 1000;
            sourceContext.collect(ts);
            Thread.sleep(1000 - ts % 1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

使用 addSource 引入TimedSource

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> streamSource = environment.addSource(new TimedSource());

  • 1
  • 2
  • 3

fromSource

<OUT> DataStreamSource<OUT> fromSource(
            Source<OUT, ?, ?> source,
            WatermarkStrategy<OUT> timestampsAndWatermarks,
            String sourceName)
  • 1
  • 2
  • 3
  • 4

1. 传入Source ,指定WatermarkStrategy

2. 常见于 Flink-connector

flink-connector-kafka ,

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

flink-connector-pulsar

PulsarSource<String> source = PulsarSource.builder()
    .setServiceUrl(serviceUrl)
    .setStartCursor(StartCursor.earliest())
    .setTopics("my-topic")
    .setDeserializationSchema(new SimpleStringSchema())
    .setSubscriptionName("my-subscription")
    .build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号