当前位置:   article > 正文

Google的Apache Beam是什么_google beam

google beam

Apache Beam指南

1. What is Beam ?

前世今生:

诞生背景:
分布式数据处理发展迅猛 –> 新的分布式数据处理技术越来越多 –> Hadoop MapReduce,Apache Spark,Apache Storm,Apache Flink,Apache Apex –> 新技术高性能 , 受欢迎,人们喜新厌旧 –> 业务的迁移 –> 迁移条件: 学习新技术,重写业务逻辑 –> 懒 –> 怎么办 ??
Apache Beam 应运而生
贵族身份:
Apache Beam - 原名 Google DateFlow
2016年2月份成为Apache基金会孵化项目
2017年1月10日正式毕业成为顶级项目
MapReduce,GFS和BigQuery之后,Google在大数据处理领域对开源社区的又一个超级大的贡献
目前最新版本: 1.0

业界影响:

被誉为进行数据流处理和批处理的最佳编程模型 !
被誉为下一代的大数据处理标准 !
Google的员工已经不再使用MapReduce了 ……

何方神圣:

Beam项目主要是对数据处理(有限的数据集,无限的数据流)的编程范式和接口进行了统一定义(Beam Model) 这样,基于Beam开发的数据处理程序可以执行在任意的分布式计算引擎上.

主要构成:
– Beam SDKs
Beam SDK定义了开发分布式数据处理任务业务逻辑的API接口,即提供一个统一的编程接口给到上层应用的开发者,开发者不需要了解底层的具体的大数据平台的开发接口是什么,直接通过Beam SDK的接口,就可以开发数据处理的加工流程,不管输入是用于批处理的有限数据集,还是流式的无限数据集。对于有限或无限的输入数据,Beam SDK都使用相同的类来表现,并且使用相同的转换操作进行处理。
– Beam Pipeline Runner
Runner 是将用户通过调用Beam SDK构成的program(pipeline)进行编译转换,当我们指定任意一个Runner时,program就会被转化为与该Runner相兼容的可直接运行的程序,所以,在运行Beam程序时,需要指明底层的正确Runner类型

Beam 架构:
image_1b9iniipq18sr4nd76p1g3iaqpm.png-282.4kB

用户通过Beam Model构建一个数据处理管道(pipeline),调用Beam SDK API实现管道里的逻辑,也就是”编程实现”,然后pipeline交给具体的Beam Runner编译,最后运行在分布式计算引擎上.
注: 流处理和批处理的未来在于 Apache Beam,而执行引擎的选择权在于用户。
ETL : 基于Beam开发的程序可以运行在多个分布式计算框架上,那么它可以用来将不同的数据源,或者多个数据存储媒体上的数据整合到一起,最终生成我们想要的数据

2. Beam Core

Data

Beam 能处理什么样的数据 ?
– 无限的时间乱序数据流

有限的数据集
无限的数据流

有限的数据集可以看做是无限的数据流的一种特例,从数据处理逻辑的角度,这两者并无不同之处
例如,假设微博数据包含时间戳和转发量,用户希望按照统计每小时的转发量总和,此业务逻辑应该可以同时在有限数据集和无限数据流上执行,并不应该因为数据源的不同而对业务逻辑的实现产生任何影响

数据进入分布式处理框架的时间(Process Time) VS 数据产生的时间(Event-Time)
这两个时间通常是不同的,例如,对于一个处理微博数据的流计算任务,一条2016-06-01-12:00:00发表的微博经过网络传输等延迟可能在2016-06-01-12:01:30才进入到流处理系统中。批处理任务通常进行全量的数据计算,较少关注数据的时间属性,但是对于流处理任务来说,由于数据流是无穷无尽的,无法进行全量的计算,通常是对某个窗口中得数据进行计算,对于大部分的流处理任务来说,按照时间进行窗口划分
对于流处理框架处理的数据流来说,其数据的到达顺序可能并不严格按照Event-Time的时间顺序。如果基于Process Time定义时间窗口,数据到达的顺序就是数据的顺序,因此不存在乱序问题。但是对于基于Event Time定义的时间窗口来说,可能存在时间靠前的消息在时间靠后的消息后到达的情况,这在分布式的数据源中可能非常常见。对于这种情况,如何确定迟到数据,以及对于迟到数据如何处理通常是很棘手的问题。

Beam Model

Beam Model从四个维度归纳了用户在进行数据处理的时候需要考虑的问题:
image_1b9ke88gn168212jb1pgh1refjlam.png-87.2kB
翻译过来:

  • What。如何对数据进行计算?例如,Sum,Join或是机器学习中训练学习模型等。在Beam SDK中由Pipeline中的操作符指定。
  • Where。数据在什么范围中计算?例如,基于Process-Time的时间窗口,基于Event-Time的时间窗口,滑动窗口等等。在BeamSDK中由Pipeline中的窗口指定。
  • When。何时将计算结果输出?例如,在1小时的Event-Time时间窗口中,每隔1分钟,将当前窗口计算结果输出。在Beam SDK中由Pipeline中的Watermark和触发器指定。
  • How。迟到数据如何处理?例如,将迟到数据计算增量结果输出,或是将迟到数据计算结果和窗口内数据计算结果合并成全量结果输出。在Beam SDK中由Accumulation指定。

3.How to Design a Pipeline

设计最简单的pipeline, 默认是最简单的维度,只需考虑下面四个问题:

  • Where is your input data stored?
  • What does your data look like?
  • What do you want to do with your data?
  • What does your output data look like, and where should it go?
    image_1b9kep5jp1j1gp671rte1sna1p791j.png-47.3kB

整个流程如何用代码实现 ?

—-create a driver program using the classes in one of the Beam SDKs.
理解几个概念:
* Pipeline
A Pipeline encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data. All Beam driver programs must create a Pipeline. When you create the Pipeline, you must also specify the execution options that tell the Pipeline where and how to run.
说白了…跟SparkContext一样一样的,承接上下文环境
* PCollection
A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial PCollection by reading data from an external data source, but you can also create a PCollection from in-memory data within your driver program. From there, PCollections are the inputs and outputs for each step in your pipeline.
说白了…跟RDD一样一样的,PCollections包含一个潜在的无限数据流。这些数据都来源于输入源,然后应用于转换。
* Transform
A Transform represents a data processing operation, or a step, in your pipeline. Every Transform takes one or more PCollection objects as input, perfroms a processing function that you provide on the elements of that PCollection, and produces one or more output PCollection objects.
一个操作PCollection处理步骤执行数据操作。典型的传递途径可能会在一个输入源有多个转换操作(例如,将一组日志条目传入的字符串转换成一个键/值对,关键是IP地址和值是日志消息)。它由BeamSDK附带的一系列标准聚合建成,当然,也可以定义根据自己的处理需求自定义。
* I/O Source and Sink
Beam provides Source and Sink APIs to represent reading and writing data, respectively. Source encapsulates the code necessary to read data into your Beam pipeline from some external source, such as cloud file storage or a subscription to a streaming data source. Sink likewise encapsulates the code necessary to write the elements of a PCollection to an external data sink.

So… 创建一个Driver Program的流程如下:

Create a Pipeline object –> set options –> initial PCollection (using the Source API to read data from an external source, or using a Create transform to build a PCollection from in-memory data.) –> Apply Transforms to each PCollection. –> Output the final, transformed PCollection(s)(Sink API ) –> Run the pipeline using the designated Pipeline Runner.

Coding:

Maven 依赖:

<!-- Apache  Beam-->
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-core</artifactId>
        <version>0.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-runners-direct-java</artifactId>
        <version>0.5.0</version>
        <scope>runtime</scope>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

Demo 解析:

public static void main(String[] args) {
    // Create the pipeline.
    PipelineOptions options = 
        PipelineOptionsFactory.fromArgs(args).create();
    Pipeline p = Pipeline.create(options);

    PCollection<String> lines = p.apply(
      "ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt"));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

当使用Beam时,Driver Program中必须先创建Pipeline类的一个实例(一般放在main方法中),而创建实例时,
需要创建 PipelineOptions对象来设置参数,最后将参数传递给 Pipeline.create();

创建PCollection的方法:
  • Reading from an external source
 PCollection<String> lines = p.apply(
      "ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt"));
  • 1
  • 2
  • Creating a PCollection from in-memory data
// Apply Create, passing the list and the coder, to create the PCollection.
 p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
  • 1
  • 2
PCollection 的特征
  • Element type
    The elements of a PCollection may be of any type, but must all be of the same type. However, to support distributed processing, Beam needs to be able to encode each individual element as a byte string (so elements can be passed around to distributed workers). The Beam SDKs provide a data encoding mechanism that includes built-in encoding for commonly-used types as well as support for specifying custom encodings as needed.
  • Immutability
    A PCollection is immutable. Once created, you cannot add, remove, or change individual elements. A Beam Transform might process each element of a PCollection and generate new pipeline data (as a new PCollection), but it does not consume or modify the original input collection.
  • Random access
    A PCollection does not support random access to individual elements. Instead, Beam Transforms consider every element in a PCollection individually.
  • Size and boundedness
    A PCollection is a large, immutable “bag” of elements. There is no upper limit on how many elements a PCollection can contain; any given PCollection might fit in memory on a single machine, or it might represent a very large distributed data set backed by a persistent data store.
    A PCollection can be either bounded or unbounded in size. A bounded PCollection represents a data set of a known, fixed size, while an unbounded PCollection represents a data set of unlimited size. Whether a PCollection is bounded or unbounded depends on the source of the data set that it represents. Reading from a batch data source, such as a file or a database, creates a bounded PCollection. Reading from a streaming or continously-updating data source, such as Pub/Sub or Kafka, creates an unbounded PCollection (unless you explicitly tell it not to).
  • Element timestamps
    Each element in a PCollection has an associated intrinsic timestamp. The timestamp for each element is initially assigned by the Source that creates the PCollection. Sources that create an unbounded PCollection often assign each new element a timestamp that corresponds to when the element was read or added.
Transforms使用

In Beam SDK each transform has a generic apply method,在运用时,使用输入的PCollection作为对象,调用apply函数,然后以参数,返回输出PCollection 即

[Output PCollection] = [Input PCollection].apply([Transform])
  • 1

当涉及到多个Transforms ,使用如下:

[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])
  • 1
  • 2
  • 3

由1个PCollection创建两个PCollection

[Output PCollection 1] = [Input PCollection].apply([Transform 1])
[Output PCollection 2] = [Input PCollection].apply([Transform 2])
  • 1
  • 2
Core Beam transforms
  • ParDo
  • GroupByKey
  • Flatten and Partition
ParDo

ParDo 是一个并行处理,它的逻辑类似于 Map/Shuffle/Reduce-style 中的Map阶段,对PCollection的每个元素进行处理,处理逻辑是用户自定义的代码,处理完之后会输出一个或多个元素给输出PCollection.
用途:

  • Filtering a data set.
    You can use ParDo to consider each element in a PCollection and either output that element to a new collection, or discard it.
  • Formatting or type-converting each element in a data set.
    If your input PCollection contains elements that are of a different type or format than you want, you can use ParDo to perform a conversion on each element and output the result to a new PCollection.
  • Extracting parts of each element in a data set.
    If you have a PCollection of records with multiple fields, for example, you can use a ParDo to parse out just the fields you want to consider into a new PCollection.
  • Performing computations on each element in a data set. You can use ParDo to perform simple or complex computations on every element, or certain elements, of a PCollection and output the results as a new PCollection.
    用法: 当使用ParDo时,必须以DoFn的类的对象作为参数来提供自定义代码,即继承DoFn类,
// The input PCollection of Strings.
PCollection<String> words = ...;

// The DoFn to perform on each element in the input PCollection.
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }

// Apply a ParDo to the PCollection "words" to compute lengths for each word.
PCollection<Integer> wordLengths = words.apply(
    ParDo
    .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

并且,实现DoFn类有一套编程规范,比如下面
参考官网:https://beam.apache.org/documentation/programming-guide/
https://beam.apache.org/documentation/programming-guide/#transforms-usercodereqs

static class ComputeWordLengthFn extends DoFn<String, Integer> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    // Get the input element from ProcessContext.
    String word = c.element();
    // Use ProcessContext.output to emit the output element.
    c.output(word.length());
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

还可以以匿名内部类的方式,比如下面:

// The input PCollection.
PCollection<String> words = ...;

// Apply a ParDo with an anonymous DoFn to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
  "ComputeWordLengths",                     // the transform name
  ParDo.of(new DoFn<String, Integer>() {    // a DoFn as an anonymous inner class instance
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(c.element().length());
      }
    }));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
GroupByKey

GroupbyKey用来处理PCollection中的Key-Value对,类似于 Map/Shuffle/Reduce-style中shuffle阶段
使用 …略!

Flatten and Partition

Flatten用来处理具有相同数据类型的PCollection,它可以将多个PCollection合并为一个PCollection,而Partition 可以将一个PCollection拆分成多个小的PCollection
Flatten示例:

// Flatten takes a PCollectionList of PCollection objects of a given type.
// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Partition示例:

// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.
// In this example, we define the PartitionFn in-line.
// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
    students.apply(Partition.of(10, new PartitionFn<Student>() {
        public int partitionFor(Student student, int numPartitions) {
            return student.getPercentile()  // 0..99
                 * numPartitions / 100;
        }}));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

4.Design Your Pipeline

Like this :
image_1b9kmpmqh1p2rneh1uoe5211fnv20.png-99.9kB
Like this:
image_1b9kmqgug187f180cebe1rab1jjj2d.png-96.3kB
Like this:
image_1b9kmrkt7hv81j351isrna2176d2q.png-123.7kB
Or Like this:
image_1b9kmsb73ndm5ji1qm4vj31psd37.png-111.7kB

5.WordCount Example

WordCount pipeline:
image_1b9kn48h9tja77hth15ki1fpr3k.png-97.4kB
WordCount源码解读:

public class WordCount {

    /**
     * 1.a.通过DoFn编程Pipeline使得代码很简洁。b.对输入的文本做单词划分,输出。
     */
    static class ExtractWordsFn extends DoFn<String, String> {
        private final Aggregator<Long, Long> emptyLines =
                createAggregator("emptyLines", Sum.ofLongs());

        @ProcessElement
        public void processElement(ProcessContext c) {
            if (c.element().trim().isEmpty()) {
                emptyLines.addValue(1L);
            }

            // 将文本行划分为单词
            String[] words = c.element().split("[^a-zA-Z']+");
            // 输出PCollection中的单词
            for (String word : words) {
                if (!word.isEmpty()) {
                    c.output(word);
                }
            }
        }
    }

    /**
     * 2.格式化输入的文本数据,将转换单词为并计数的打印字符串。
     */
    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
        @Override
        public String apply(KV<String, Long> input) {
            return input.getKey() + ": " + input.getValue();
        }
    }

    /**
     * 3.单词计数,PTransform(PCollection Transform)将PCollection的文本行转换成格式化的可计数单词。
     */
    public static class CountWords extends PTransform<PCollection<String>,
            PCollection<KV<String, Long>>> {
        @Override
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

            // 将文本行转换成单个单词
            PCollection<String> words = lines.apply(
                    ParDo.of(new ExtractWordsFn()));

            // 计算每个单词次数
            PCollection<KV<String, Long>> wordCounts =
                    words.apply(Count.<String>perElement());

            return wordCounts;
        }
    }

    /**
     * 4.可以自定义一些选项(Options),比如文件输入输出路径
     */
    public interface WordCountOptions extends PipelineOptions {

        /**
         * 文件输入选项,可以通过命令行传入路径参数,路径默认为gs://apache-beam-samples/shakespeare/kinglear.txt
         */
        @Description("Path of the file to read from")
        @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
        String getInputFile();

        void setInputFile(String value);

        /**
         * 设置结果文件输出路径,在intellij IDEA的运行设置选项中或者在命令行中指定输出文件路径,如./pom.xml
         */
        @Description("Path of the file to write to")
        @Required
        String getOutput();

        void setOutput(String value);
    }

    /**
     * 5.运行程序
     */
    public static void main(String[] args) {
        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.class);
        //   options.setRunner(FlinkRunner.class);
        //   dataflowOptions.setRunner(DataflowRunner.class);
        Pipeline p = Pipeline.create(options);

        p.apply("ReadLines", TextIO.Read.from(args[0]))
                .apply(new CountWords())
                .apply(MapElements.via(new FormatAsTextFn()))
                .apply("WriteCounts", TextIO.Write.to(args[1]));
        p.run().waitUntilFinish();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97

6.Run The Pipeline

IDEA直接运行

  • 设置VM options
    -DPdirect-runner
    -DPspark-runner
    -DPapex-runner
    -DPflink-runner
  • 设置Programe arguments
    --inputFile=pom.xml --output=counts
    image_1b9kpv56grrbp619q81p2l1jnp41.png-70.5kB
    运行结果:
    image_1b9kq3ehg1t9o31tctd11dj1ecf4e.png-76.3kB

打包提交到集群上运行

如果要提交到spark 集群上运行,pom需要以下依赖

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-spark</artifactId>
  <version>0.5.0</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>${spark.version}</version>
</dependency>
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <configuration>
    <createDependencyReducedPom>false</createDependencyReducedPom>
    <filters>
      <filter>
        <artifact>*:*</artifact>
        <excludes>
          <exclude>META-INF/*.SF</exclude>
          <exclude>META-INF/*.DSA</exclude>
          <exclude>META-INF/*.RSA</exclude>
        </excludes>
      </filter>
    </filters>
  </configuration>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <shadedArtifactAttached>true</shadedArtifactAttached>
        <shadedClassifierName>shaded</shadedClassifierName>
      </configuration>
    </execution>
  </executions>
</plugin>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

使用mvn package打包后生成

beam-examples-1.0.0-shaded.jar
  • 1

提交到spark集群运行,打包运行是需要在pom中注释掉direct-runner

  <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>0.5.0</version>
            <scope>runtime</scope>
 </dependency>
 
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

提交命令:

/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/bin/spark-submit --class com.hypers.demo.WordCount --master local[2] vin.beam-1.0-SNAPSHOT-jar-with-dependencies.jar --runner=SparkRunner --inputFile=wordcount.txt --output=counts
 
 
  • 1

测试结果:
image_1b9l0fpe61eas1vj4834127l1o1e4r.png-214.3kB

image_1b9l0g95uv4f1k98dv1257s9158.png-41.3kB

7. Mobile Gaming Pipeline Examples

当用户使用一款手机游戏时,会生成包含以下内容的数据:

  • The unique ID of the user playing the game. //用户ID
  • The team ID for the team to which the user belongs. //用户所属Team ID
  • A score value for that particular instance of play. //用户分数
  • A timestamp that records when the particular instance of play happened–this is the event time for each game data event. //用户玩完一局游戏时产生的时间戳
    The data events might be received by the game server significantly later than users generate them.
    image_1b9und8ta105n1baijojmup1onu9.png-80.4kB

统计UserScore –Basic Score Processing in Batch

UserScore’s basic pipeline flow does the following:

  • Read the day’s score data from a file stored in a text file.
  • Sum the score values for each unique user by grouping each game event by user ID and combining the score values to get the total score for that particular user.
  • Write the result data to a Google Cloud BigQuery table.

https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java

public class UserScore {

  /**
   * Class to hold info about a game event.
   */
  @DefaultCoder(AvroCoder.class)
  static class GameActionInfo {
    @Nullable String user;
    @Nullable String team;
    @Nullable Integer score;
    @Nullable Long timestamp;

    public GameActionInfo() {}

    public GameActionInfo(String user, String team, Integer score, Long timestamp) {
      this.user = user;
      this.team = team;
      this.score = score;
      this.timestamp = timestamp;
    }

    public String getUser() {
      return this.user;
    }
    public String getTeam() {
      return this.team;
    }
    public Integer getScore() {
      return this.score;
    }
    public String getKey(String keyname) {
      if (keyname.equals("team")) {
        return this.team;
      } else {  // return username as default
        return this.user;
      }
    }
    public Long getTimestamp() {
      return this.timestamp;
    }
  }


  /**
   * Parses the raw game event info into GameActionInfo objects. Each event line has the following
   * format: username,teamname,score,timestamp_in_ms,readable_time
   * e.g.:
   * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
   * The human-readable time string is not used here.
   */
  static class ParseEventFn extends DoFn<String, GameActionInfo> {

    // Log and count parse errors.
    private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
    private final Aggregator<Long, Long> numParseErrors =
        createAggregator("ParseErrors", Sum.ofLongs());

    @ProcessElement
    public void processElement(ProcessContext c) {
      String[] components = c.element().split(",");
      try {
        String user = components[0].trim();
        String team = components[1].trim();
        Integer score = Integer.parseInt(components[2].trim());
        Long timestamp = Long.parseLong(components[3].trim());
        GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
        c.output(gInfo);
      } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
        numParseErrors.addValue(1L);
        LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
      }
    }
  }

  /**
   * A transform to extract key/score information from GameActionInfo, and sum the scores. The
   * constructor arg determines whether 'team' or 'user' info is extracted.
   */
  // [START DocInclude_USExtractXform]
  public static class ExtractAndSumScore
      extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

    private final String field;

    ExtractAndSumScore(String field) {
      this.field = field;
    }

    @Override
    public PCollection<KV<String, Integer>> expand(
        PCollection<GameActionInfo> gameInfo) {

      return gameInfo
        .apply(MapElements
            .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
            .withOutputType(
                TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))
        .apply(Sum.<String>integersPerKey());
    }
  }
  // [END DocInclude_USExtractXform]

  /**
   * Options supported by {@link UserScore}.
   */
  public interface Options extends PipelineOptions {

    @Description("Path to the data file(s) containing game data.")
    // The default maps to two large Google Cloud Storage files (each ~12GB) holding two subsequent
    // day's worth (roughly) of data.
    @Default.String("gs://apache-beam-samples/game/gaming_data*.csv")
    String getInput();
    void setInput(String value);

    @Description("BigQuery Dataset to write tables to. Must already exist.")
    @Validation.Required
    String getDataset();
    void setDataset(String value);

    @Description("The BigQuery table name. Should not already exist.")
    @Default.String("user_score")
    String getUserScoreTableName();
    void setUserScoreTableName(String value);
  }

  /**
   * Create a map of information that describes how to write pipeline output to BigQuery. This map
   * is passed to the {@link WriteToBigQuery} constructor to write user score sums.
   */
  protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
      configureBigQueryWrite() {
    Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
        new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
    tableConfigure.put(
        "user",
        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
            "STRING", (c, w) -> c.element().getKey()));
    tableConfigure.put(
        "total_score",
        new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
            "INTEGER", (c, w) -> c.element().getValue()));
    return tableConfigure;
  }


  /**
   * Run a batch pipeline.
   */
 // [START DocInclude_USMain]
  public static void main(String[] args) throws Exception {
    // Begin constructing a pipeline configured by commandline flags.
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Read events from a text file and parse them.
    pipeline.apply(TextIO.Read.from(options.getInput()))
      .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
      // Extract and sum username/score pairs from the event data.
      .apply("ExtractUserScore", new ExtractAndSumScore("user"))
      .apply("WriteUserScoreSums",
          new WriteToBigQuery<KV<String, Integer>>(options.getUserScoreTableName(),
                                                   configureBigQueryWrite()));

    // Run the batch pipeline.
    pipeline.run().waitUntilFinish();
  }
  // [END DocInclude_USMain]

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169

HourlyTeamScore –Advanced Processing in Batch with Windowing

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/969803
推荐阅读
相关标签
  

闽ICP备14008679号