当前位置:   article > 正文

Apache Beam中的几种常见的处理类_pcollection>輸出

pcollection>輸出

在阅读本文前,可先看一下官方的WordCount代码, 对Apache Beam有大概的了解。


要说在Apache Beam中常见的函数是哪一个,当然是apply()。常见的写法如下:

  1. [Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
  2. .apply([Second Transform])
  3. .apply([Third Transform])

而在最简单的wordcount代码中,就出现了许多种不同的传入参数类型,除了输入输出的部分,还包括

1)使用ParDo.of():

  1. .apply("ExtractWords-joe",
  2. ParDo.of(new DoFn<String, String>() {
  3. @ProcessElement
  4. public void processElement(ProcessContext context) {
  5. System.out.println(context.element()+"~");
  6. for (String word : context.element().split(" ")) {
  7. if (!word.isEmpty()) {
  8. //输出到Output PCollection
  9. context.output(word);
  10. }
  11. }
  12. }
  13. })
  14. )
2)使用MapElements.via():

  1. .apply("FomatResults",
  2. MapElements.via(new SimpleFunction<KV<String, Long>,String>() {
  3. @Override
  4. public String apply(KV<String, Long> input) {
  5. return input.getKey()+":"+input.getValue();
  6. }
  7. }))
3)以及使用PTransform子类:

.apply(new CountWords())

  1. public static class CountWords extends PTransform<PCollection<String>,
  2. PCollection<KV<String, Long>>> {
  3. @Override
  4. public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
  5. // Convert lines of text into individual words.
  6. PCollection<String> words = lines.apply(
  7. ParDo.of(new ExtractWordsFn()));
  8. // Count the number of times each word occurs.
  9. PCollection<KV<String, Long>> wordCounts =
  10. words.apply(Count.<String>perElement());
  11. return wordCounts;
  12. }
  13. }


这么多种传入方式到底有什么联系?通过查看源码可以看出apply函数的定义如下:

  1. public <OutputT extends POutput> OutputT apply(
  2. String name, PTransform<? super PBegin, OutputT> root) {
  3. return begin().apply(name, root);
  4. }
传入的参数为PTransform类对象,也就是这几种传入参数其实都是PTransform类的变形。

PTransform是一个实现了Serializable接口的抽象类,其中public abstract OutputT expand(InputT input); 是数据处理方法,强制子类必须实现。

因此第(3)种方式很容易理解,就是通过继承PTransform并实现了expand方法定义了CountWords类,给apply方法传递了一个CountWords对象。


在第(2)种方式中,MapElements是PTransform的子类,实现了expand方法,其实现方式是调用@Nullable private final SimpleFunction<InputT, OutputT> fn;成员中定义的数据处理方法,MapElements.via()则是一个为初始化fn的静态方法,定义如下:

  1. public static <InputT, OutputT> MapElements<InputT, OutputT> via(
  2. final SimpleFunction<InputT, OutputT> fn) {
  3. return new MapElements<>(fn, null, fn.getClass());
  4. }
传入了一个SimpleFunction对象,SimpleFunction是一个必须实现public OutputT apply(InputT input) 方法的抽象类,用户在该apply方法中实现数据处理。

所以这种方式的实现方式如下:

定义SimpleFunction的子类并实现其中的apply方法,将该子类的对象传递给MapElements.via()。


第(1)种方式中,ParDo.of()方法传入一个DoFn对象, 返回一个SingleOutput对象:

  1. public static <InputT, OutputT> SingleOutput<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
  2. validate(fn);
  3. return new SingleOutput<InputT, OutputT>(
  4. fn, Collections.<PCollectionView<?>>emptyList(), displayDataForFn(fn));
  5. }
SingleOutput与MapElements类似,也是PTransform的子类,实现了expand方法,使用private final DoFn<InputT, OutputT> fn;成员中的方法进行数据处理。

而DoFn是一个抽象类,用户必须实现其注解方法(存疑) public void processElement(ProcessContext c)。

所以这种方式的实现方式如下:

定义DoFn的子类并实现其中的processElement方法,将该子类的对象传递给ParDo.of()。

需要注意的是processElement方法与前2种方式不同,输入和输出数据都是在传入参数ProcessContext c中,而不是通过return进行传递。


以上为学习Apache Beam一天的总结,有错误欢迎指正。




Day2补充,3种方式的区别和联系:

1)MapElement.via(SimpleFunction)和PTransform

public class MapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> 
从泛型参数来看,PTransform处理的是PCollection,而MapElement处理的是PCollection中的一个元素,对比SimpleFunction的apply方法和PTransform的expand方法的实现方式得到验证。

2)MapElement.via(SimpleFunction)和ParDo.of(DoFn)

区别之前已经说过,DoFn的processElement方法的输入和输出都是从参数传入,而SimpleFunction的apply方法从参数传入输入,从return传出输出。

相同的是这2个方法处理的都是PCollection中的一个元素。

查看MapElement的expand方法源码:

@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
  checkNotNull(fn, "Must specify a function on MapElements using .via()");
  return input.apply(
      "Map",
      ParDo.of(
          new DoFn<InputT, OutputT>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
              c.output(fn.apply(c.element()));
            }
	//部分代码忽略
          }));
}
可以看出其实也是实现了DoFn的子类,在DoFn的processElement方法中调用SimpleFunction对象的apply方法进行处理。







声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/969854
推荐阅读
相关标签
  

闽ICP备14008679号