赞
踩
Worker 的意思是工作的人。在Worker Thread 模式里,工人线程将逐个取回任务并进行处理。当所有工作全部完成后,工人线程会等待新的工作到来;
如果从“工人聚集”的来看,该模式也可以称为“线程池”模式;
//Worker线程池 public class Channel { private static final int MAX_REQUEST=100; private final Request[] requestQueue; private int tail; private int head; private int count; private final WorkerThread[] threadsPool; public Channel(int threads){ this.requestQueue=new Request[MAX_REQUEST]; this.head=0; this.tail=0; this.count=0; threadsPool=new WorkerThread[threads]; for(int i=0;i<threadsPool.length;i++){ threadsPool[i]=new WorkerThread("Worker-"+i,this); } } public void startWorkers(){ for(int i=0;i<threadsPool.length;i++){ threadsPool[i].start(); } } public synchronized void putRequest(Request request){ while(count>=requestQueue.length){ try{ wait(); }catch(InterruptedException e){ e.printStackTrace(); } } requestQueue[tail]=request; tail=(tail+1)%requestQueue.length; count++; notifyAll(); } public synchronized Request takeRequest(){ while(count<=0){ try{ wait(); }catch(InterruptedException e){ e.printStackTrace(); } } Request request=requestQueue[head]; head=(head+1)%requestQueue.length; count--; notifyAll(); return request; } } //客户线程,发送请求 public class ClientThread extends Thread{ private final Channel channel; private static final Random random=new Random(); public ClientThread(String name,Channel channel){ super(name); this.channel=channel; } public void run(){ try{ for(int i=0;true;i++){ Request request=new Request(getName(),i); channel.putRequest(request); Thread.sleep(random.nextInt(1000)); } }catch(InterruptedException e){ e.printStackTrace(); } } } //工人线程,处理请求 public class WorkerThread extends Thread{ private final Channel channel; public WorkerThread(String name,Channel channel){ super(name); this.channel=channel; } public void run(){ while(true){ Request request=channel.takeRequest(); request.execute(); } } } //模拟请求 public class Request { private final String name; private final int number; private static final Random random=new Random(); public Request (String name,int number){ this.name=name; this.number=number; } public void execute(){ System.out.println(Thread.currentThread().getName()+" executes "+this); try{ Thread.sleep(random.nextInt(1000)); }catch(InterruptedException e){ e.printStackTrace(); } } @Override public String toString(){ return "[Request from "+name+" NO."+number+"]"; } } //启动类 public class Tester { public static void main(String[] args){ Channel channel=new Channel(5); channel.startWorkers(); new ClientThread("Alice",channel).start(); new ClientThread("Bobby",channel).start(); new ClientThread("Chris",channel).start(); } }
Worker Thread 模式实现了调用和执行分离;而Thread Per Message模式也实现了同样的效果;不同的是Thread Per Message对于每个请求会创建一个线程,而Worker Thread则事先创建了许多线程;相当于对线程进行了缓存和重用;
ClientThread相当于Producer,不断地生产Request;然后Channel相当于Table,而WorkerThread则相当于Consumer,不断地处理Request;不同的是Channel缓存了WorkerThread,相当于把Consumer和Table绑在了一起;从这个角度来看,Worker Thread 和Producer-Consumer模式很类似;只是角色各有不同:Worker Thread模式中,只出现了两个角色:请求者和缓冲池;而Producer-Consumer模式则有三个角色:Producer-Buffer-Consumer;Buffer是独立的;
前面在Thread Per Message模式中,提到“从处理方来看,则没有顺序了,不仅任务结束时无序的,甚至连任务的开始也是无序的”;实际上,这个“无序”是在请求者的角度来看。从Channel角度来看,只是对顺序没有做处理而已,实际上只要想控制,那么Channel是可以控制“执行”顺序的;因为“调用和执行”是分离的嘛;Client负责调用;Worker负责执行;而作为Worker的管理者——Channel自然可以对“执行”做出控制!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。