赞
踩
在我们平时的程序处理过程中,在效率上而言,串行处理的效率不如并行处理的效率,从线程层面而言,即多线程效率不如单线程。但是尽管说并行处理效率确实会比较高,但是它在处理拥有数据结果依赖关系的逻辑时,需要额外的同步管控。例如我的输出怎么临时被存放,然后被下游程序收到处理等等。倘若我们设计的并行处理程序能很好地解决,逻辑依赖关系,那么无疑并行处理的方式将会大大提速我们实际系统中的执行效率。本文笔者来聊聊其中一种被称为Pipeline(流水线)模式的并行处理作业模式,相信此并行处理模式在实际工作中还是有所应用场景的。
首先一个问题,什么叫做Pipeline并行处理模式呢?Pipeline并行处理模式,首先它具有Pipeline属性,它有一条完整的依赖关系处理流程。比如一道工序总共分3个流程,A,B和C, 并且有严格的先后顺序执行要求。B流程的执行必须依赖A流程的执行结果,同样C流程需要依赖B的。其实这么来看,串行处理模式是天然适用于Pipeline处理模式的。
不过本文我们要讨论的是并行处理Pipeline模式作业。如果按照惯常使用的多线程依赖同步的处理方法,主要有以下两种:
接下来笔者将要讨论的方法是第二种方法。
这里我们将用模拟车厢的方式来进行Pipeline work的执行,每个车厢代表一个执行单元,车厢具有Pipeline work的独有特征:
另外在每个车厢内,它还具有以下变量:
因此,基于车厢模型的Pipeline处理模式如下图所示:
上图中Middle Carriage(中部车厢)可能会有很多节,每个线程都有对应的中间结果输出列表。
此部分笔者来分享一段Pipeline并行处理模式的多线实现代码,引用自Hadoop社区JIRA的一个patch。
在这个patch代码中,还是沿用了上节车厢的概念,另外它的整个过程可拆分为以下几个步骤:
针对上述子步骤,我们逐一来阐述。首先是Pipeline work的定义,此work的是每个车厢的线程的具体执行逻辑。
PipelineWork header = new PipelineWork<Object, TestItem>() { int idx = 0; @Override public TestItem doWork(Object obj) throws IOException { if (idx < items.size()) { TestItem item = items.get(idx); item.setVal(item.getVal() * 2); if (exp && (idx == items.size() - 1)) { throw ioe; } idx++; LOG.info(Thread.currentThread().getName() + ": Head worker produce item: " + item.getVal()); return item; } else { LOG.info("Head worker finsihed produce item."); return null; } } }; PipelineWork middle = new PipelineWork<TestItem, TestItem>() { @Override public TestItem doWork(TestItem item) { item.setVal(item.getVal() * 2); LOG.info(Thread.currentThread().getName() + ": Middle worker set value: " + item.getVal()); return item; } }; PipelineWork trailer = new PipelineWork<TestItem, Object>() { @Override public Object doWork(TestItem item) { item.setVal(item.getVal() * 2); LOG.info(Thread.currentThread().getName() + ": Trailer woker set value: " + item.getVal()); return EMPTY; } };
然后根据上面的work,构建Pipeline task,
PipelineTask task = new PipelineTask(conf, "pipeline.testcarriage");
task.appendWork(header, "header");
task.appendWork(middle, "middle" + i);
task.appendWork(trailer, "trailer");
task.kickOff();
task.join();
在Pipeline的task的构建过程中,会进行车厢的组织,
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。