当前位置:   article > 正文

Pipeline并行处理模型

pipeline并行

前言


在我们平时的程序处理过程中,在效率上而言,串行处理的效率不如并行处理的效率,从线程层面而言,即多线程效率不如单线程。但是尽管说并行处理效率确实会比较高,但是它在处理拥有数据结果依赖关系的逻辑时,需要额外的同步管控。例如我的输出怎么临时被存放,然后被下游程序收到处理等等。倘若我们设计的并行处理程序能很好地解决,逻辑依赖关系,那么无疑并行处理的方式将会大大提速我们实际系统中的执行效率。本文笔者来聊聊其中一种被称为Pipeline(流水线)模式的并行处理作业模式,相信此并行处理模式在实际工作中还是有所应用场景的。

Pipeline并行处理模式概要


首先一个问题,什么叫做Pipeline并行处理模式呢?Pipeline并行处理模式,首先它具有Pipeline属性,它有一条完整的依赖关系处理流程。比如一道工序总共分3个流程,A,B和C, 并且有严格的先后顺序执行要求。B流程的执行必须依赖A流程的执行结果,同样C流程需要依赖B的。其实这么来看,串行处理模式是天然适用于Pipeline处理模式的。

不过本文我们要讨论的是并行处理Pipeline模式作业。如果按照惯常使用的多线程依赖同步的处理方法,主要有以下两种:

  • 将程序中间结果写出到第三方存储介质内,然后并行处理线程与此第三方存储进行数据交互,同步。
  • 程序中间结果保存到本地变量内,通过线程安全的做法将中间结果进行赋值输出,灵活一点的,还可以进行同步的控制。

接下来笔者将要讨论的方法是第二种方法。

车厢模拟式的Pipeline并行处理模式


这里我们将用模拟车厢的方式来进行Pipeline work的执行,每个车厢代表一个执行单元,车厢具有Pipeline work的独有特征:

  • 车厢具有连接关系,每个车厢有它的前车厢和后车厢。
  • 头车厢无前车厢,它的处理无须依赖前车厢执行结果。
  • 尾车厢无后车厢,它的处理完毕即意味着总执行过程的结束。

另外在每个车厢内,它还具有以下变量:

  • 多执行线程
  • 中间结果置换列表

因此,基于车厢模型的Pipeline处理模式如下图所示:
在这里插入图片描述
上图中Middle Carriage(中部车厢)可能会有很多节,每个线程都有对应的中间结果输出列表。

Pipeline并行处理模式代码实现


此部分笔者来分享一段Pipeline并行处理模式的多线实现代码,引用自Hadoop社区JIRA的一个patch。

在这个patch代码中,还是沿用了上节车厢的概念,另外它的整个过程可拆分为以下几个步骤:

  1. Pipeline work的定义以及Pipeline task的构建
  2. 根据Pipeline task,来构建车厢
  3. 车厢内部逻辑处理
    3.1) 头车厢(起始线程)处理过程
    3.2) 中部车(中部线程)厢处理过程
    3.3) 尾部车厢(末尾线程)处理过程

针对上述子步骤,我们逐一来阐述。首先是Pipeline work的定义,此work的是每个车厢的线程的具体执行逻辑。

    PipelineWork header = new PipelineWork<Object, TestItem>() {
   
      int idx = 0;

      @Override
      public TestItem doWork(Object obj) throws IOException {
   
        if (idx < items.size()) {
   
          TestItem item = items.get(idx);
          item.setVal(item.getVal() * 2);
          if (exp && (idx == items.size() - 1)) {
   
            throw ioe;
          }
          idx++;
          
          LOG.info(Thread.currentThread().getName() +
              ": Head worker produce item: " + item.getVal());
          return item;
        } else {
   
          LOG.info("Head worker finsihed produce item.");
          return null;
        }
      }
    };
    PipelineWork middle = new PipelineWork<TestItem, TestItem>() {
   
      @Override
      public TestItem doWork(TestItem item) {
   
        item.setVal(item.getVal() * 2);
        LOG.info(Thread.currentThread().getName() +
            ": Middle worker set value: " + item.getVal());

        return item;
      }
    };
    PipelineWork trailer = new PipelineWork<TestItem, Object>() {
   
      @Override
      public Object doWork(TestItem item) {
   
        item.setVal(item.getVal() * 2);
        LOG.info(Thread.currentThread().getName() +
            ": Trailer woker set value: " + item.getVal());

        return EMPTY;
      }
    };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

然后根据上面的work,构建Pipeline task,

    PipelineTask task = new PipelineTask(conf, "pipeline.testcarriage");
    task.appendWork(header, "header");
    task.appendWork(middle, "middle" + i);
    task.appendWork(trailer, "trailer");
    task.kickOff();
    task.join();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在Pipeline的task的构建过程中,会进行车厢的组织,


                
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/967301
推荐阅读
相关标签
  

闽ICP备14008679号