赞
踩
诞生背景:
分布式数据处理发展迅猛 –> 新的分布式数据处理技术越来越多 –> Hadoop MapReduce,Apache Spark,Apache Storm,Apache Flink,Apache Apex –> 新技术高性能 , 受欢迎,人们喜新厌旧 –> 业务的迁移 –> 迁移条件: 学习新技术,重写业务逻辑 –> 懒 –> 怎么办 ??
Apache Beam 应运而生
贵族身份:
Apache Beam - 原名 Google DateFlow
2016年2月份成为Apache基金会孵化项目
2017年1月10日正式毕业成为顶级项目
继MapReduce,GFS和BigQuery之后,Google在大数据处理领域对开源社区的又一个超级大的贡献
目前最新版本: 1.0
被誉为进行数据流处理和批处理的最佳编程模型 !
被誉为下一代的大数据处理标准 !
Google的员工已经不再使用MapReduce了 ……
Beam项目主要是对数据处理(有限的数据集,无限的数据流)的编程范式和接口进行了统一定义(Beam Model) 这样,基于Beam开发的数据处理程序可以执行在任意的分布式计算引擎上.
主要构成:
– Beam SDKs
Beam SDK定义了开发分布式数据处理任务业务逻辑的API接口,即提供一个统一的编程接口给到上层应用的开发者,开发者不需要了解底层的具体的大数据平台的开发接口是什么,直接通过Beam SDK的接口,就可以开发数据处理的加工流程,不管输入是用于批处理的有限数据集,还是流式的无限数据集。对于有限或无限的输入数据,Beam SDK都使用相同的类来表现,并且使用相同的转换操作进行处理。
– Beam Pipeline Runner
Runner 是将用户通过调用Beam SDK构成的program(pipeline)进行编译转换,当我们指定任意一个Runner时,program就会被转化为与该Runner相兼容的可直接运行的程序,所以,在运行Beam程序时,需要指明底层的正确Runner类型
Beam 架构:
用户通过Beam Model构建一个数据处理管道(pipeline),调用Beam SDK API实现管道里的逻辑,也就是”编程实现”,然后pipeline交给具体的Beam Runner编译,最后运行在分布式计算引擎上.
注: 流处理和批处理的未来在于 Apache Beam,而执行引擎的选择权在于用户。
ETL : 基于Beam开发的程序可以运行在多个分布式计算框架上,那么它可以用来将不同的数据源,或者多个数据存储媒体上的数据整合到一起,最终生成我们想要的数据
Beam 能处理什么样的数据 ?
– 无限的时间乱序数据流
有限的数据集
无限的数据流
有限的数据集可以看做是无限的数据流的一种特例,从数据处理逻辑的角度,这两者并无不同之处
例如,假设微博数据包含时间戳和转发量,用户希望按照统计每小时的转发量总和,此业务逻辑应该可以同时在有限数据集和无限数据流上执行,并不应该因为数据源的不同而对业务逻辑的实现产生任何影响
。
数据进入分布式处理框架的时间(Process Time) VS 数据产生的时间(Event-Time)
这两个时间通常是不同的,例如,对于一个处理微博数据的流计算任务,一条2016-06-01-12:00:00发表的微博经过网络传输等延迟可能在2016-06-01-12:01:30才进入到流处理系统中。批处理任务通常进行全量的数据计算,较少关注数据的时间属性,但是对于流处理任务来说,由于数据流是无穷无尽的,无法进行全量的计算,通常是对某个窗口中得数据进行计算,对于大部分的流处理任务来说,按照时间进行窗口划分
对于流处理框架处理的数据流来说,其数据的到达顺序可能并不严格按照Event-Time的时间顺序。如果基于Process Time定义时间窗口,数据到达的顺序就是数据的顺序,因此不存在乱序问题。但是对于基于Event Time定义的时间窗口来说,可能存在时间靠前的消息在时间靠后的消息后到达的情况,这在分布式的数据源中可能非常常见。对于这种情况,如何确定迟到数据,以及对于迟到数据如何处理通常是很棘手的问题。
Beam Model从四个维度归纳了用户在进行数据处理的时候需要考虑的问题:
翻译过来:
—-create a driver program using the classes in one of the Beam SDKs.
理解几个概念:
* Pipeline
A Pipeline encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data. All Beam driver programs must create a Pipeline. When you create the Pipeline, you must also specify the execution options that tell the Pipeline where and how to run.
说白了…跟SparkContext一样一样的,承接上下文环境
* PCollection
A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial PCollection by reading data from an external data source, but you can also create a PCollection from in-memory data within your driver program. From there, PCollections are the inputs and outputs for each step in your pipeline.
说白了…跟RDD一样一样的,PCollections包含一个潜在的无限数据流。这些数据都来源于输入源,然后应用于转换。
* Transform
A Transform represents a data processing operation, or a step, in your pipeline. Every Transform takes one or more PCollection objects as input, perfroms a processing function that you provide on the elements of that PCollection, and produces one or more output PCollection objects.
一个操作PCollection处理步骤执行数据操作。典型的传递途径可能会在一个输入源有多个转换操作(例如,将一组日志条目传入的字符串转换成一个键/值对,关键是IP地址和值是日志消息)。它由BeamSDK附带的一系列标准聚合建成,当然,也可以定义根据自己的处理需求自定义。
* I/O Source and Sink
Beam provides Source and Sink APIs to represent reading and writing data, respectively. Source encapsulates the code necessary to read data into your Beam pipeline from some external source, such as cloud file storage or a subscription to a streaming data source. Sink likewise encapsulates the code necessary to write the elements of a PCollection to an external data sink.
Create a Pipeline object –> set options –> initial PCollection (using the Source API to read data from an external source, or using a Create transform to build a PCollection from in-memory data.) –> Apply Transforms to each PCollection. –> Output the final, transformed PCollection(s)(Sink API ) –> Run the pipeline using the designated Pipeline Runner.
Maven 依赖:
<!-- Apache Beam-->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>0.5.0</version>
<scope>runtime</scope>
</dependency>
Demo 解析:
public static void main(String[] args) {
// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply(
"ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt"));
}
当使用Beam时,Driver Program中必须先创建Pipeline类的一个实例(一般放在main方法中),而创建实例时,
需要创建 PipelineOptions对象来设置参数,最后将参数传递给 Pipeline.create();
PCollection<String> lines = p.apply(
"ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt"));
// Apply Create, passing the list and the coder, to create the PCollection.
p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
In Beam SDK each transform has a generic apply method,在运用时,使用输入的PCollection作为对象,调用apply函数,然后以参数,返回输出PCollection 即
[Output PCollection] = [Input PCollection].apply([Transform])
当涉及到多个Transforms ,使用如下:
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])
由1个PCollection创建两个PCollection
[Output PCollection 1] = [Input PCollection].apply([Transform 1])
[Output PCollection 2] = [Input PCollection].apply([Transform 2])
ParDo 是一个并行处理,它的逻辑类似于 Map/Shuffle/Reduce-style 中的Map阶段,对PCollection的每个元素进行处理,处理逻辑是用户自定义的代码,处理完之后会输出一个或多个元素给输出PCollection.
用途:
// The input PCollection of Strings.
PCollection<String> words = ...;
// The DoFn to perform on each element in the input PCollection.
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
// Apply a ParDo to the PCollection "words" to compute lengths for each word.
PCollection<Integer> wordLengths = words.apply(
ParDo
.of(new ComputeWordLengthFn())); // The DoFn to perform on each element, which
并且,实现DoFn类有一套编程规范,比如下面
参考官网:https://beam.apache.org/documentation/programming-guide/
https://beam.apache.org/documentation/programming-guide/#transforms-usercodereqs
static class ComputeWordLengthFn extends DoFn<String, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
// Get the input element from ProcessContext.
String word = c.element();
// Use ProcessContext.output to emit the output element.
c.output(word.length());
}
}
还可以以匿名内部类的方式,比如下面:
// The input PCollection.
PCollection<String> words = ...;
// Apply a ParDo with an anonymous DoFn to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
"ComputeWordLengths", // the transform name
ParDo.of(new DoFn<String, Integer>() { // a DoFn as an anonymous inner class instance
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().length());
}
}));
GroupbyKey用来处理PCollection中的Key-Value对,类似于 Map/Shuffle/Reduce-style中shuffle阶段
使用 …略!
Flatten用来处理具有相同数据类型的PCollection,它可以将多个PCollection合并为一个PCollection,而Partition 可以将一个PCollection拆分成多个小的PCollection
Flatten示例:
// Flatten takes a PCollectionList of PCollection objects of a given type.
// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
Partition示例:
// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.
// In this example, we define the PartitionFn in-line.
// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
students.apply(Partition.of(10, new PartitionFn<Student>() {
public int partitionFor(Student student, int numPartitions) {
return student.getPercentile() // 0..99
* numPartitions / 100;
}}));
Like this :
Like this:
Like this:
Or Like this:
WordCount pipeline:
WordCount源码解读:
public class WordCount {
/**
* 1.a.通过DoFn编程Pipeline使得代码很简洁。b.对输入的文本做单词划分,输出。
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element().trim().isEmpty()) {
emptyLines.addValue(1L);
}
// 将文本行划分为单词
String[] words = c.element().split("[^a-zA-Z']+");
// 输出PCollection中的单词
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
/**
* 2.格式化输入的文本数据,将转换单词为并计数的打印字符串。
*/
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
/**
* 3.单词计数,PTransform(PCollection Transform)将PCollection的文本行转换成格式化的可计数单词。
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// 将文本行转换成单个单词
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// 计算每个单词次数
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
/**
* 4.可以自定义一些选项(Options),比如文件输入输出路径
*/
public interface WordCountOptions extends PipelineOptions {
/**
* 文件输入选项,可以通过命令行传入路径参数,路径默认为gs://apache-beam-samples/shakespeare/kinglear.txt
*/
@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
/**
* 设置结果文件输出路径,在intellij IDEA的运行设置选项中或者在命令行中指定输出文件路径,如./pom.xml
*/
@Description("Path of the file to write to")
@Required
String getOutput();
void setOutput(String value);
}
/**
* 5.运行程序
*/
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
// options.setRunner(FlinkRunner.class);
// dataflowOptions.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.Read.from(args[0]))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(args[1]));
p.run().waitUntilFinish();
}
}
-DPdirect-runner
-DPspark-runner
-DPapex-runner
-DPflink-runner
--inputFile=pom.xml --output=counts
如果要提交到spark 集群上运行,pom需要以下依赖
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>shaded</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>
使用mvn package打包后生成
beam-examples-1.0.0-shaded.jar
提交到spark集群运行,打包运行是需要在pom中注释掉direct-runner
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>0.5.0</version>
<scope>runtime</scope>
</dependency>
提交命令:
/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/bin/spark-submit --class com.hypers.demo.WordCount --master local[2] vin.beam-1.0-SNAPSHOT-jar-with-dependencies.jar --runner=SparkRunner --inputFile=wordcount.txt --output=counts
测试结果:
当用户使用一款手机游戏时,会生成包含以下内容的数据:
UserScore’s basic pipeline flow does the following:
public class UserScore {
/**
* Class to hold info about a game event.
*/
@DefaultCoder(AvroCoder.class)
static class GameActionInfo {
@Nullable String user;
@Nullable String team;
@Nullable Integer score;
@Nullable Long timestamp;
public GameActionInfo() {}
public GameActionInfo(String user, String team, Integer score, Long timestamp) {
this.user = user;
this.team = team;
this.score = score;
this.timestamp = timestamp;
}
public String getUser() {
return this.user;
}
public String getTeam() {
return this.team;
}
public Integer getScore() {
return this.score;
}
public String getKey(String keyname) {
if (keyname.equals("team")) {
return this.team;
} else { // return username as default
return this.user;
}
}
public Long getTimestamp() {
return this.timestamp;
}
}
/**
* Parses the raw game event info into GameActionInfo objects. Each event line has the following
* format: username,teamname,score,timestamp_in_ms,readable_time
* e.g.:
* user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
* The human-readable time string is not used here.
*/
static class ParseEventFn extends DoFn<String, GameActionInfo> {
// Log and count parse errors.
private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
private final Aggregator<Long, Long> numParseErrors =
createAggregator("ParseErrors", Sum.ofLongs());
@ProcessElement
public void processElement(ProcessContext c) {
String[] components = c.element().split(",");
try {
String user = components[0].trim();
String team = components[1].trim();
Integer score = Integer.parseInt(components[2].trim());
Long timestamp = Long.parseLong(components[3].trim());
GameActionInfo gInfo = new GameActionInfo(user, team, score, timestamp);
c.output(gInfo);
} catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
numParseErrors.addValue(1L);
LOG.info("Parse error on " + c.element() + ", " + e.getMessage());
}
}
}
/**
* A transform to extract key/score information from GameActionInfo, and sum the scores. The
* constructor arg determines whether 'team' or 'user' info is extracted.
*/
// [START DocInclude_USExtractXform]
public static class ExtractAndSumScore
extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
private final String field;
ExtractAndSumScore(String field) {
this.field = field;
}
@Override
public PCollection<KV<String, Integer>> expand(
PCollection<GameActionInfo> gameInfo) {
return gameInfo
.apply(MapElements
.via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
.withOutputType(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))
.apply(Sum.<String>integersPerKey());
}
}
// [END DocInclude_USExtractXform]
/**
* Options supported by {@link UserScore}.
*/
public interface Options extends PipelineOptions {
@Description("Path to the data file(s) containing game data.")
// The default maps to two large Google Cloud Storage files (each ~12GB) holding two subsequent
// day's worth (roughly) of data.
@Default.String("gs://apache-beam-samples/game/gaming_data*.csv")
String getInput();
void setInput(String value);
@Description("BigQuery Dataset to write tables to. Must already exist.")
@Validation.Required
String getDataset();
void setDataset(String value);
@Description("The BigQuery table name. Should not already exist.")
@Default.String("user_score")
String getUserScoreTableName();
void setUserScoreTableName(String value);
}
/**
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is passed to the {@link WriteToBigQuery} constructor to write user score sums.
*/
protected static Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>
configureBigQueryWrite() {
Map<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
new HashMap<String, WriteToBigQuery.FieldInfo<KV<String, Integer>>>();
tableConfigure.put(
"user",
new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
"STRING", (c, w) -> c.element().getKey()));
tableConfigure.put(
"total_score",
new WriteToBigQuery.FieldInfo<KV<String, Integer>>(
"INTEGER", (c, w) -> c.element().getValue()));
return tableConfigure;
}
/**
* Run a batch pipeline.
*/
// [START DocInclude_USMain]
public static void main(String[] args) throws Exception {
// Begin constructing a pipeline configured by commandline flags.
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
// Read events from a text file and parse them.
pipeline.apply(TextIO.Read.from(options.getInput()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
// Extract and sum username/score pairs from the event data.
.apply("ExtractUserScore", new ExtractAndSumScore("user"))
.apply("WriteUserScoreSums",
new WriteToBigQuery<KV<String, Integer>>(options.getUserScoreTableName(),
configureBigQueryWrite()));
// Run the batch pipeline.
pipeline.run().waitUntilFinish();
}
// [END DocInclude_USMain]
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。