赞
踩
一、介绍:
Apache Beam
实施可在任何执行引擎上运行的批处理和流数据处理作业, 提供了一套统一的 API 来处理两种数据处理模式(批和流),让我们只需要将注意力专注于数据处理的算法上,而不用再花时间去维护两种数据处理模式上的差异。
二、架构:
三、开源社区
文档地址:https://beam.apache.org/get-started/quickstart-java/
开源地址:https://github.com/apache/beam
四、应用
依赖:
<!-- 本地运行Runners --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>2.13.0</version> </dependency> <!-- 运行Runners核心jar --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-core-java</artifactId> <version>2.13.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>2.13.0</version> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-kafka</artifactId> <version>2.13.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency>
代码案例:
package com.citydo.faceadd.beam; import java.util.Arrays; import java.util.List; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; /** * @author nick */ public class PipelineTest { public static void main(String[] args) { getPipelineOne(); getPipelineTwo(); } private static void getPipelineOne(){ // 创建管道工厂 PipelineOptions options = PipelineOptionsFactory.create(); // 显式指定PipelineRunner:FlinkRunner必须指定如果不制定则为本地 // 显式指定PipelineRunner:FlinkRunner(Local模式)//必须指定如果不制定则为本地 options.setRunner(DirectRunner.class); // options.setRunner(FlinkRunner.class); // // 显式指定PipelineRunner:FlinkRunner(Local模式)//必须指定如果不制定则为本地 Pipeline pipeline = Pipeline.create(options);// 设置相关管道 //为了演示显示内存数据集 final List<String> LINES = Arrays.asList( "Aggressive",//有进取心的 "Bold",//大胆的,勇敢的 "Apprehensive",//有理解能力的 "Brilliant");//才华横溢的 //设置管道的数据集 PCollection<String> dbRowCollection =pipeline.apply( Create.of(LINES)).setCoder(StringUtf8Coder.of()); PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){ @ProcessElement public void processElement(ProcessContext c) { if(c.element().startsWith("A")){//查找以"A"开头的数据 c.output(c.element()); System.out.append("A开头的单词有:"+c.element()+"\r"); } } })); PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){ @ProcessElement public void processElement(ProcessContext c) { if(c.element().startsWith("B")){//查找以"B"开头的数据 c.output(c.element()); System.out.append("B开头的单词有:"+c.element()+"\r"); } } })); pipeline.run().waitUntilFinish(); } private static void getPipelineTwo(){ // 创建管道工厂 PipelineOptions options = PipelineOptionsFactory.create(); // 显式指定PipelineRunner:FlinkRunner必须指定如果不制定则为本地 options.setRunner(DirectRunner.class); // 生产环境关闭 // options.setRunner(FlinkRunner.class); //生成环境打开 Pipeline pipeline = Pipeline.create(options);// 设置相关管道 // 为了演示显示内存数据集 // 叫号数据 final List<KV<String, String>> txtnoticelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I", "101号顾客请到3号柜台"), KV.of("DS-2CD2T26FDWDA3-IS", "102号顾客请到1号柜台"), KV.of("DS-2CD6984F-IHS", "103号顾客请到4号柜台"), KV.of("DS-2CD7627HWD-LZS", "104号顾客请到2号柜台")); //AI行为分析消息 final List<KV<String, String>> aimessagelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I", "CMOS智能半球网络摄像机,山东省济南市解放路支行3号柜台,type=2,display_image=no"), KV.of("DS-2CD2T26FDWDA3-IS", "CMOS智能筒型网络摄像机,山东省济南市甸柳庄支行1号柜台,type=2,display_image=no"), KV.of("DS-2CD6984F-IHS", "星光级全景拼接网络摄像机,山东省济南市市中区支行4号柜台,type=2,display_image=no"), KV.of("DS-2CD7627HWD-LZS", "全结构化摄像机,山东省济南市市中区支行2号柜台,type=2,display_image=no")); PCollection<KV<String, String>> notice = pipeline.apply("CreateEmails", Create.of(txtnoticelist)); PCollection<KV<String, String>> message = pipeline.apply("CreatePhones", Create.of(aimessagelist)); final TupleTag<String> noticeTag = new TupleTag<>(); final TupleTag<String> messageTag = new TupleTag<>(); PCollection<KV<String, CoGbkResult>> results = KeyedPCollectionTuple.of(noticeTag, notice).and(messageTag, message).apply(CoGroupByKey.create()); System.out.append("合并分组后的结果:\r"); PCollection<String> contactLines = results.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, String>() { private static final long serialVersionUID = 1L; @ProcessElement public void processElement(ProcessContext c) { KV<String, CoGbkResult> e = c.element(); String name = e.getKey(); Iterable<String> emailsIter = e.getValue().getAll(noticeTag); Iterable<String> phonesIter = e.getValue().getAll(messageTag); System.out.append("" + name + ";" + emailsIter + ";" + phonesIter + ";" + "\r"); } })); pipeline.run().waitUntilFinish(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。