当前位置:   article > 正文

Apache Beam_apache beam api 地址

apache beam api 地址

一、介绍:

Apache Beam 实施可在任何执行引擎上运行的批处理和流数据处理作业, 提供了一套统一的 API 来处理两种数据处理模式(批和流),让我们只需要将注意力专注于数据处理的算法上,而不用再花时间去维护两种数据处理模式上的差异。

二、架构:

在这里插入图片描述

三、开源社区

文档地址:https://beam.apache.org/get-started/quickstart-java/
开源地址:https://github.com/apache/beam

四、应用

依赖:

 <!-- 本地运行Runners -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>2.13.0</version>
        </dependency>
        <!-- 运行Runners核心jar -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-core-java</artifactId>
            <version>2.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>2.13.0</version>
        </dependency>
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-kafka</artifactId>
            <version>2.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

代码案例:

package com.citydo.faceadd.beam;


import java.util.Arrays;
import java.util.List;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;

/**
 * @author nick
 */
public class PipelineTest {

    public static void main(String[] args) {
        getPipelineOne();
        getPipelineTwo();
    }



    private static void getPipelineOne(){
        // 创建管道工厂
        PipelineOptions options = PipelineOptionsFactory.create();
        // 显式指定PipelineRunner:FlinkRunner必须指定如果不制定则为本地
        // 显式指定PipelineRunner:FlinkRunner(Local模式)//必须指定如果不制定则为本地
        options.setRunner(DirectRunner.class);
        // options.setRunner(FlinkRunner.class); //
        // 显式指定PipelineRunner:FlinkRunner(Local模式)//必须指定如果不制定则为本地
        Pipeline pipeline = Pipeline.create(options);// 设置相关管道
        //为了演示显示内存数据集
        final List<String> LINES = Arrays.asList(
                "Aggressive",//有进取心的
                "Bold",//大胆的,勇敢的
                "Apprehensive",//有理解能力的
                "Brilliant");//才华横溢的

        //设置管道的数据集
        PCollection<String> dbRowCollection =pipeline.apply(
                Create.of(LINES)).setCoder(StringUtf8Coder.of());
        PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){
            @ProcessElement
            public void processElement(ProcessContext c) {
                if(c.element().startsWith("A")){//查找以"A"开头的数据
                    c.output(c.element());
                    System.out.append("A开头的单词有:"+c.element()+"\r");
                }
            }
        }));
        PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){
            @ProcessElement
            public void processElement(ProcessContext c) {
                if(c.element().startsWith("B")){//查找以"B"开头的数据
                    c.output(c.element());
                    System.out.append("B开头的单词有:"+c.element()+"\r");
                }
            }
        }));
        pipeline.run().waitUntilFinish();
    }


    private static void getPipelineTwo(){
        // 创建管道工厂
        PipelineOptions options = PipelineOptionsFactory.create();
        // 显式指定PipelineRunner:FlinkRunner必须指定如果不制定则为本地
        options.setRunner(DirectRunner.class); // 生产环境关闭
        // options.setRunner(FlinkRunner.class); //生成环境打开

        Pipeline pipeline = Pipeline.create(options);// 设置相关管道
        // 为了演示显示内存数据集
        // 叫号数据
        final List<KV<String, String>> txtnoticelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I", "101号顾客请到3号柜台"), KV.of("DS-2CD2T26FDWDA3-IS", "102号顾客请到1号柜台"),
                KV.of("DS-2CD6984F-IHS", "103号顾客请到4号柜台"), KV.of("DS-2CD7627HWD-LZS", "104号顾客请到2号柜台"));
        //AI行为分析消息
        final List<KV<String, String>> aimessagelist = Arrays.asList(KV.of("DS-2CD2326FWDA3-I", "CMOS智能半球网络摄像机,山东省济南市解放路支行3号柜台,type=2,display_image=no"),
                KV.of("DS-2CD2T26FDWDA3-IS", "CMOS智能筒型网络摄像机,山东省济南市甸柳庄支行1号柜台,type=2,display_image=no"),
                KV.of("DS-2CD6984F-IHS", "星光级全景拼接网络摄像机,山东省济南市市中区支行4号柜台,type=2,display_image=no"), KV.of("DS-2CD7627HWD-LZS", "全结构化摄像机,山东省济南市市中区支行2号柜台,type=2,display_image=no"));
        PCollection<KV<String, String>> notice = pipeline.apply("CreateEmails", Create.of(txtnoticelist));
        PCollection<KV<String, String>> message = pipeline.apply("CreatePhones", Create.of(aimessagelist));
        final TupleTag<String> noticeTag = new TupleTag<>();
        final TupleTag<String> messageTag = new TupleTag<>();
        PCollection<KV<String, CoGbkResult>> results = KeyedPCollectionTuple.of(noticeTag, notice).and(messageTag, message).apply(CoGroupByKey.create());
        System.out.append("合并分组后的结果:\r");
        PCollection<String> contactLines = results.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, String>() {
            private static final long serialVersionUID = 1L;
            @ProcessElement
            public void processElement(ProcessContext c) {
                KV<String, CoGbkResult> e = c.element();
                String name = e.getKey();
                Iterable<String> emailsIter = e.getValue().getAll(noticeTag);
                Iterable<String> phonesIter = e.getValue().getAll(messageTag);
                System.out.append("" + name + ";" + emailsIter + ";" + phonesIter + ";" + "\r");
            }
        }));
        pipeline.run().waitUntilFinish();
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112

参考:https://github.com/xsm110/Apache-Beam-Example

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/969812
推荐阅读
相关标签
  

闽ICP备14008679号