赞
踩
圈主 [Rocky编程日记] 学习多线程的设计模式笔记记录。希望我写得笔记你能够喜欢, 希望我写的笔记能够给你提供帮助。同时若笔记中存在不对的地方,那一定是圈主当时的理解还不够, 希望你能够及时指出嗷~
代码仓库地址名称:
code-multithread-pattern
有需要多线程设计模式PDF或者代码仓库地址的可以私我嗷~
需要引用 如下 pom文件依赖
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
意思是 "以一个线程执行 "。类似于独木桥同一时间内只允许一个人通行一样, 该模式用于设置限制, 以确保同一时间内只能让一个线程执行处理。
该模式有时候也被称为 “临界区” 或 “临界域” 这个名称侧重于执行处理的线程(过桥的人), 而临界区域临界域的名称则侧重于执行范围(人过的桥)。
案例简介: 模拟三位通行者频繁通过一个一次只允许一个人经过的门的情形。当通行者通过门的时候, 统计人数便会递增。并且程序还会记录通行者的 “姓名与出生地”。
通行者
public class User { private String name; private String address; public User(String name, String address) { this.name = name; this.address = address; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } }
用户任务
/** * 用户任务 */ public class UserThread extends Thread { private Logger log = LoggerFactory.getLogger(this.getClass()); /** * final 与 创建线程安全的实例 */ private final SafeGate safeGate; private final User user; public UserThread(SafeGate safeGate, User user) { this.safeGate = safeGate; this.user = user; } @Override public void run() { log.info("线程: {} 姓名:{} 经过安全门",Thread.currentThread().getName(),user.getName()); while (true) { safeGate.pass(user); } } }
安全门
public class SafeGate { private int number; private String name; private String address; public void pass(User user) { this.number++; this.name = user.getName(); // 休眠 try { Thread.sleep(1000); } catch (InterruptedException e) { } this.address = user.getAddress(); check(); } @Override public String toString() { return "No." + number + ": "+ name + " : " + address; } private void check() { if (name.charAt(0) != address.charAt(0)) { System.out.println("=== BROKEN ===" + toString()); } } }
测试
public class SafeGateTest {
public static void main(String[] args) {
SafeGate safeGate = new SafeGate();
new UserThread(safeGate,new User("rocky", "ro_Zhou")).start();
new UserThread(safeGate,new User("luo","mei_Zhou")).start();
new UserThread(safeGate,new User("chen","chen_Zhou")).start();
}
}
结果
=== BROKEN ===No.3: rocky : mei_Zhou
=== BROKEN ===No.3: rocky : mei_Zhou
=== BROKEN ===No.6: chen : ro_Zhou
=== BROKEN ===No.6: chen : ro_Zhou
=== BROKEN ===No.6: chen : ro_Zhou
=== BROKEN ===No.10: rocky : chen_Zhou
=== BROKEN ===No.10: rocky : chen_Zhou
=== BROKEN ===No.12: chen : mei_Zhou
=== BROKEN ===No.12: chen : mei_Zhou
=== BROKEN ===No.15: luo : mei_Zhou
=== BROKEN ===No.16: luo : ro_Zhou
=== BROKEN ===No.17: rocky : chen_Zhou
休眠了一段时间 , 控制台日志显示与预期结果不一致。不知道为啥会出现原因的可以 搜一搜 缓存一致性协议(MESI)。
改造使之安全
/** * 安全门 */ public class SafeGate { private int number; private String name; private String address; public synchronized void pass(User user) { this.number++; this.name = user.getName(); // 休眠 try { Thread.sleep(1000); } catch (InterruptedException e) { } this.address = user.getAddress(); check(); } @Override public synchronized String toString() { return "No." + number + ": "+ name + " : " + address; } private void check() { if (name.charAt(0) != address.charAt(0)) { System.out.println("=== BROKEN ===" + toString()); } } }
SharedResource(共享资源)
Single Threaded Execution 模式中出现了一个发挥 SharedResource
(共享资源) 作用的类。由 SafeGate
扮演 SharedResource
角色。
SharedResource
角色是可被多个线程访问的类, 包含很多方法, 但这些方法主要分为如下两类。
safeMethod
: 多个线程同时调用也不会发生问题的方法unsafeMethod:
多个线程同时会调用会发生问题, 因此必须加以保护的方法。Single Threaded Execution模式会保护 unsafeMethod
, 使其同时只能由一个线程访问。Java则是通过 unsafeMethod
声明 为 synchronzied
方法来保护。我们将只允许单个线程执行的程序范围称为临界区。
synchronzied
方法。当然, 在 synchronzied
方法并不会破坏程序的安全性。但 synchronzied
方法要比调用一般方法花费时间, 这会稍微降低程序性能。ShareResource
角色的状态会变化。在 Single Thread Execution
模式中, 满足下列条件时, 死锁就会发生。
SharedResource
角色的锁的同时, 还想获取其他 SharedResource
角色的锁SharedResource
角色的锁的顺序并不固定(SharedResource
角色是对称的)假设我们现在要编写 一个 SharedResource
角色的子类。如果子类能够访问 SharedResource
角色的字段, 那么编写子类的开发人员就可能会不小心编写出无保护的 unsafeMethod
。即使能够确保好不容易编写的 SharedResource
角色的安全性, 在子类化时还是有可能会失去安全性。如果不将包含子类在内的所有 unsafeMethod
都声名为 synchronized
方法, 就无法确定 SharedResource
角色的安全性。
在 面向对象的程序设计中, 伴随子类化而出现的 “继承” 起着非常重要的作用。但对于多线程程序设计来说, 继承会引起一些麻烦的问题。我们通常称之为 继承反常。
Single Threaded Execution 模式会降低程序性能, 原因有如下两个方面。
获取锁花费时间 : 进入 synchronized
方法时, 线程需要获取对象的锁, 该处理会花费时间。
如果 SharedResource
角色的数量减少了, 那么要获取的锁的数量也会相应地减少, 从而就能够抑制性能的下降了。
线程冲突引起的等待 : 当线程执行临界区内的处理时, 其他想要进入临界区的线程会阻塞。这种状况称为线程冲突。发生冲突时, 程序的整体性能会随着线程等待时间的增加而下降。
如果尽可能地缩小临界区的范围, 降低线程冲突的概率, 那么就能够抑制性能的下降
Java.lang.String
类是用于表示字符串, String 类中并没有修改字符串内容的方法。也就是说, String 的实例所表示的字符串内容绝对不会发生变化。正因为如此, String类中的方法无需声明为 synchronized。 因为实例的内部状态不会发生改变, 所以无论 String 实例被多少个线程访问,也无需执行线程的互斥处理。
Immutable就是不变的、不发生改变。
Immutable模式中存在着确保实例状态不发生改变的类。在访问这些实例时不需要执行耗时的互斥处理。如果能用好该模式,就可以提高程序性能。
如String就是一个不可变(immutable)类。
User类
public class User { private final String name; private final String address; public User(String name, String address) { this.name = name; this.address = address; } @Override public String toString() { return "User{" + "name='" + name + '\'' + ", address='" + address + '\'' + '}'; } public String getName() { return name; } public String getAddress() { return address; } }
PrintPersonThread
类
public class PrintPersonThread extends Thread {
private User user;
public PrintPersonThread(User user) {
this.user = user;
}
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + " prints " + user);
}
}
}
PrintPersonThreadTest
public class PrintPersonThreadTest {
public static void main(String[] args) {
User alice = new User("Rocky编程日记", "Rocky编程日记");
new PrintPersonThread(alice).start();
new PrintPersonThread(alice).start();
new PrintPersonThread(alice).start();
}
}
现在回想一下 Single Threaded Execution模式。该模式会将修改或引用实例状态的地方设置为临界区, 使这个区域只能由一个线程同时执行。但像 User
类这样, 实例的状态绝对不会发生改变时, 情况就不一样啦。即使多个线程同时对该实例执行处理, 实例也不会出错, 因为实例的状态肯定不会发生改变。既然实例的状态肯定不会发生改变,那么也就无须使用 synchronized 来保护实例。因为即使想破坏实例, 也破坏不了。
思考: 假设存在于一个类, 由于该类会被多个线程访问, 所以我们使用 synchronized
进行了保护。这里, 如果该类中存在 setting 方法, 那么 Immutable模式就不成立啦。
假设 我们查看程序后发现实际上这个 setter 方法并未被使用, 那么就可以将字段声明为 final, 删除setter 方法, 并注意遵守不可变性,这样或许就能将其改造为可适用于 Immutable 模式了。
实际上可能使用了 setter 方法。这时候, 该类就不适用于 Immutable 模式了, 但这还不是放弃的时候, 我们来仔细查看了一下整个程序是如何使用该类的, 看是不是可以分为使用 setter 方法的情况下与不使用 setter 方法的情况。如果可以明确分为这两种情况, 那我们是不是可以将这个类拆分为 mutable 类 和 immutable 类, 然后再设计成可以根据 mutable 实例创建 immutable 实例, 并可以反过来根据 immutable 实例创建 mutable 实例呢? 这样, immutable 类的部分就可以应用 Immutable 模式了。
Java 的标准类库中就有成对的 mutable 类 和 immutable 类, 例如:
java.lang.StringBuffer
类和java.lang.String
类。StringBuffer
类是表示字符串的 mutable 类。表示的字符串能够随便改写, 为了确保安全, 改写时需要妥善使用 synchronized
。而 String 类是表示字符串的 immutable 类。String 实例表示的字符串不可以改写。
StringBuffer
类中有一个以 String 为参数的构造函数, 而 String
类中有一个以 StringBuffer
为参数的构造函数。也就是说, StringBuffer
的实例和 String
的实例可以互相转换。
不可变性是一个很微妙的性质, 代码稍微一修改, 程序可能就会失去不可变性。如果从使用了 Immutable 模式的程序中删除了 synchronized, 那么当失去不可变性时,程序的安全性就会完全丧失, 所以一定要注意。
是“被保护着的”、“被防卫着的”意思,suspension则是“暂停”的意思。当现在并不适合马上执行某个操作时,就要求想要执行该操作的线程等待,这就是Guarded Suspension Pattern。
Guarded Suspension Pattern 会要求线程等候,以保障实例的安全性,其它类似的称呼还有guarded wait、spin lock等。
Request
/** * 一个请求的类 */ public class Request { private final String name; public Request(String name) { this.name = name; } public String getName() { return name; } public String toString() { return "[ Request " + name + " ]"; } }
RequestQueue
/** * 依次存放请求的类 */ public class RequestQueue { private final Queue<Request> queue = new LinkedList<Request>(); /** * 1. 施加守护条件进行保护 * 2. 不等待和等待的情况 * 3. 执行 wait, 等待条件发生变化 [明确线程在等待什么?应该何时执行notify/notifyAll] * 4. 执行到while的下一条语句时一定能确定的事情 * @return */ public synchronized Request getRequest() { while (queue.peek() == null) { // 守护条件的逻辑非运算 try { System.out.println(Thread.currentThread().getName() + ": wait() begins, queue = " + queue); wait(); System.out.println(Thread.currentThread().getName() + ": wait() ends, queue = " + queue); } catch (InterruptedException e) { } } // != null 守护条件 return queue.remove(); } public synchronized void putRequest(Request request) { queue.offer(request); System.out.println(Thread.currentThread().getName() + ": notifyAll() begins, queue = " + queue); notifyAll(); System.out.println(Thread.currentThread().getName() + ": notifyAll() ends, queue = " + queue); } }
ClientThread
/** * 发送请求的类 */ public class ClientThread extends Thread { private final Random random; private final RequestQueue requestQueue; public ClientThread(RequestQueue requestQueue, String name, long seed) { super(name); this.requestQueue = requestQueue; this.random = new Random(seed); } public void run() { for (int i = 0; i < 10000; i++) { Request request = new Request("No." + i); System.out.println(Thread.currentThread().getName() + " requests " + request); requestQueue.putRequest(request); try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { } } } }
ServerThread
/** * 接收请求的类 */ public class ServerThread extends Thread { private final Random random; private final RequestQueue requestQueue; public ServerThread(RequestQueue requestQueue, String name, long seed) { super(name); this.requestQueue = requestQueue; this.random = new Random(seed); } public void run() { for (int i = 0; i < 10000; i++) { Request request = requestQueue.getRequest(); System.out.println(Thread.currentThread().getName() + " handles " + request); try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { } } } }
SuspensionTest
/**
* 测试程序的类
*/
public class SuspensionTest {
public static void main(String[] args) {
RequestQueue requestQueue = new RequestQueue();
new ClientThread(requestQueue, "Rocky", 3141592L).start();
new ServerThread(requestQueue, "Rocky_BCRJ", 6535897L).start();
}
}
GuardedObject
(被守护的对象)
GuardedObject
角色是一个持有被守护的方法 (guardeMethod
) 的类。当线程执行 guardeMethod
方法时, 若守护条件成立, 则可以立即执行; 当守护条件不成立时, 就要进行等待。守护条件的成立与否会随着 GuardedObject
角色的状态不同而发生变化。
还有可能持有其他改变实例状态(特别是改变守护条件)的方法(stateChangingMethod
)。
在 Java 中 guardeMethod
通过 while语句和wait方法来实现, stateChangingMethod
则通过 notify/notifyAll
方法来实现
synchronized
Single Threaded Execution
模式中, 只要有一个线程进入临界区, 其他线程就无法进入, 只能等待。Guarded Suspension
模式中, 线程是否等待取决于守护条件。Guarded Suspension
模式是在 Single Threaded Execution
模式的基础上附加了条件而形成的。也就是说, Guarded Suspension
模式是类似于附加条件的 synchronized 这样的模式。if
wait
的线程每次被 notify / notifyAll
时都会检查守护条件。不管被 notify / notifyAll
多少次, 如果守护条件不成立, 线程都会随着 while
再次 wait
。wait
与 notify / notifyAll
的责任
wait / notifyAll
只出现在 RequestQueue
类中, 而并未出现在 ClientThread、ServerThread
类中。Guarded Suspension
模式的实现封装在 RequestQueue
类中。wait / notifyAll
隐藏起来的做法对 RequestQueue
类的复用性来说是非常重要的。其他类无需 考虑 wait / notifyAll
的问题, 只需要调用就可以了。guarded suspension
: 被守护而暂停执行的含义guarded wait
: 被守护而等待, 其实现方法为线程使用 wait
进行等待, 被notify / notifyAll
后, 再次检查条件是否成立。由于线程在使用 wait
进行等待的期间是待在等待队列中停止执行的。所以并不会浪费虚拟机的处理时间。busy wait
: 忙于等待, 其实现方法并未使用 wait
进行等待, 而是执行 yield
的同时检查守护条件。由于等待端的线程是在持续运行的。所以会浪费虚拟机的处理时间。spin lock
: 通过旋转来锁定, 在条件成立之前, 通过 while
循环 旋转等待的情形。polling
: 反复检查某个事件是否发生, 若发生, 则执行相应处理的方式 。Balking模式: 如果现在不适合执行这个操作, 或者没必要执行这个操作, 就停止处理, 直接返回。
Data
/** * 可以修改并保存的数据的类 */ public class Data { private final String filename; // 保存的文件名 private String content; // 数据内容 private boolean changed; // 修改后的内容若未保存,则为true public Data(String filename, String content) { this.filename = filename; this.content = content; this.changed = true; } // 修改数据内容 public synchronized void change(String newContent) { content = newContent; changed = true; } // 若数据内容修改过,则保存到文件中 public synchronized void save() throws IOException { if (!changed) { System.out.println(Thread.currentThread().getName() + " balks"); return; } doSave(); changed = false; } // 将数据内容实际保存到文件中 private void doSave() throws IOException { System.out.println(Thread.currentThread().getName() + " calls doSave, content = " + content); Writer writer = new FileWriter(filename); writer.write(content); writer.close(); } }
SaverThread
public class SaverThread extends Thread { private final Data data; public SaverThread(String name, Data data) { super(name); this.data = data; } public void run() { try { while (true) { data.save(); // 要求保存数据 Thread.sleep(1000); // 休眠约1秒 } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
ChangerThread
public class ChangerThread extends Thread { private final Data data; private final Random random = new Random(); public ChangerThread(String name, Data data) { super(name); this.data = data; } public void run() { try { for (int i = 0; true; i++) { data.change("No." + i); // 修改数据 Thread.sleep(random.nextInt(1000)); // 执行其他操作 data.save(); // 显式地保存 } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Test
public class Test {
public static void main(String[] args) {
Data data = new Data("data.txt", "(empty)");
new ChangerThread("ChangerThread", data).start();
new SaverThread("SaverThread", data).start();
}
}
GuardedObject
(被守护的对象)
GuardedObject
角色是一个拥有被防护的方法(guardedMethod
)的类。当线程执行 guardedMethod
方法时, 若守护条件成立, 则执行实际的处理。而当守护条件不成立时, 则不执行实际的处理, 直接返回。守护条件的成立与否, 会随着 GuardedObject
角色的状态变化而发生变化。
除了 guardedMethod
之外, GuardedObject
角色还有可能有其他来改变状态的方法(stateChangingMethod
)。
就是生产者-消费者模式。生产者和消费者在为不同的处理线程,生产者必须将数据安全地交给消费者,消费者进行消费时,如果生产者还没有建立数据,则消费者需要等待。
一般来说,可能存在多个生产者和消费者,不过也有可能生产者和消费者都只有一个,当双方都只有一个时,我们也称之为Pipe Pattern。
MakerThread
用于制作蛋糕, 并将其放置到桌子上, 也就是糕点师 。
该类会先暂停一段随机时长, 然后再调用 Table 类的 put 方法将制作好的蛋糕放置到桌子上。暂停的这段时间模拟的是 “制作蛋糕所花费的时间”。
public class MakerThread extends Thread { private final Random random; private final Table table; private static int id = 0; // 蛋糕的流水号(所有糕点师共用) public MakerThread(String name, Table table, long seed) { super(name); this.table = table; this.random = new Random(seed); } public void run() { try { while (true) { Thread.sleep(random.nextInt(1000)); String cake = "[ Cake No." + nextId() + " by " + getName() + " ]"; table.put(cake); } } catch (InterruptedException e) { } } private static synchronized int nextId() { return id++; } }
EaterThread
用于表示从桌子上取蛋糕吃的客人。
客人通过 Table 类的take 方法取桌子上的蛋糕。然后 , 与
MakerThread
类一样,EaterThread
也会暂停一段随机长的时间。这段暂停时间模拟的是 “吃蛋糕花费的时间”。
public class EaterThread extends Thread { private final Random random; private final Table table; public EaterThread(String name, Table table, long seed) { super(name); this.table = table; this.random = new Random(seed); } public void run() { try { while (true) { String cake = table.take(); Thread.sleep(random.nextInt(1000)); } } catch (InterruptedException e) { } } }
Table
```java
public class Table {
private final String[] buffer;
private int tail; // 下次put的位置
private int head; // 下次take的位置
private int count; // buffer中的蛋糕个数
public Table(int count) { this.buffer = new String[count]; this.head = 0; this.tail = 0; this.count = 0; } public synchronized void put(String cake) throws InterruptedException { System.out.println(Thread.currentThread().getName() + " puts " + cake); while (count >= buffer.length) { System.out.println(Thread.currentThread().getName() + " wait BEGIN"); wait(); System.out.println(Thread.currentThread().getName() + " wait END"); } buffer[tail] = cake; tail = (tail + 1) % buffer.length; count++; notifyAll(); } public synchronized String take() throws InterruptedException { while (count <= 0) { System.out.println(Thread.currentThread().getName() + " wait BEGIN"); wait(); System.out.println(Thread.currentThread().getName() + " wait END"); } String cake = buffer[head]; head = (head + 1) % buffer.length; count--; notifyAll(); System.out.println(Thread.currentThread().getName() + " takes " + cake); return cake; }
}
```
Data
: Producer
角色生成 Data
角色, 并将其传递给 Channel
。
Producer
(生产者) : 从 Channel
角色获取 Data
角色并使用 。
Consumer
(消费者) : 从 Channel
角色获取 Data
角色并使用。
Channel
(通道) : Channel
角色保管从 Producer
角色来获取 Data
角色, 还会响应 Consumer
角色的请求, 传递 Data
角色。为了确保安全性, Channel
角色会对Producer
角色和 Consumer
角色的访问执行互斥处理。
当Producer
角色将 Data
角色传递给 Channel
角色时, 如果 Channel
角色的状态不适合接收 Data
角色, 那么 Producer
角色将一直等待, 直至 Channel
角色的状态变为可以接收为止。
当 Consumer
角色从Channel
角色获取 Data
角色时, 如果 Channel
角色中没有可以传递的 Data
角色, 那么 Consumer
角色将一直等待, 直至 Channel
角色的状态变为可以传递 Data
角色为止。
当存在多个 Producer
角色 和 Consumer
角色时, 为了避免各处理互相影响, Channel
角色需要执行互斥处理。
Channel
角色相当于 Producer
角色 和 Consumer
角色之间, 承担用于传递 Data
角色的中转站、通道的任务。
守护安全性的 Channel
角色
在该模式下, 承担安全守护责任的是 Channel
角色。Channel
角色执行线程间的互斥处理, 确保 Producer
角色正确地将 Data
角色传递给 Consumer
角色。
Producer
角色不可以直接调用 Consumer
方法么?
直接调用方法
Consumer
角色想要获取Data
角色, 通常都是因为想使用这些 Data
角色来执行某些处理。如果 Producer
角色直接调用 Consumer
角色的方法, 那么执行处理的就不是 Consumer
角色的线程, 而是 Producer
角色的线程了。
这样一来, 执行处理花费的时间必须由 Producer
角色的线程来承担, 准备下一个数据的处理也会相应发生延迟。这样子会使程序的响应性变得很差。
插入 Channel 角色
Producer
角色将 Data
角色传递给 Channel
角色之后, 无需等待 Consumer
角色对 Data
角色进行处理。可以立即开始准备下一个 Data
角色。也就是说, Producer
角色可以持续不断地创建Data
角色。Producer
角色不会受到 Consumer
角色的处理进展状况的影响。
Channel 角色的剩余空间所导致的问题
在示例程序中, 桌子上最多可以放置3个蛋糕。当糕点师放置蛋糕时,如果桌子上的蛋糕个数在3个以内, 则可以顺利放置, 但如果是4个及4个以上时, 就必须等待客人取走蛋糕才行。如果客人吃得慢, 糕点师就必须等待很久。
如果桌子上可以放置的蛋糕个数增多会怎么样呢? 这时, 就算客人吃得慢, 糕点师也无需等待, 可以直接制作蛋糕并放到桌子上。桌子上可以放置的蛋糕个数(buffer
字段的元素个数)是用于缓冲MakerThread
和 EaterThread
之间的处理速度差的。
当然, 如果客人吃蛋糕的平均速度小于糕点师制作蛋糕的速度, 那么桌子上的蛋糕会逐渐增多, 一段时间后还是会达到 buffer
字段的元素个数上限。
如果使用 java.util.LinkedList
类, 那么创建的 Channel
角色能储存的实例个数就不会存在上限。但这时, 如果 EaterThread
的平均速度较慢, 一段时间之后(或许要过很长一段时间) 内存就会不足, 也就无法再创建表示蛋糕的实例。
以什么顺序传递 Data 角色呢
"存在中间角色"的意义
Consumer角色只有一个使会怎么样呢
该模式考虑多个Producer
给多个 Consumer
传递数据的情况。这里, 我们来思考一下 Consumer
角色只有一个时程序会怎么样。也就是 "多个 Producer
角色和一个 Consumer
角色"的情况。
如果 Consumer
角色有一个, 也就是说处理 Channel
角色中储存的Data
角色的线程只有一个, 不需要注意Consumer
角色的线程之间互相影响。如果 Consumer
角色有多个, 我们就要注意不能让 Consumer
角色的线程之间互相影响。[多生产者单消费者模型类似于事件分发机制]
###Read-Write Lock 模式 简介
将读取与写入分开处理,在读取数据之前必须获取用来读取的锁定,而写入的时候必须获取用来写入的锁定。
因为读取时实例的状态不会改变,所以多个线程可以同时读取;
但是,写入会改变实例的状态,所以当有一个线程写入的时候,其它线程既不能读取与不能写入。
Data
package code.rocky.readWriteLock; public class Data { private final char[] buffer; private final ReadWriteLock lock = new ReadWriteLock(); public Data(int size) { this.buffer = new char[size]; for (int i = 0; i < buffer.length; i++) { buffer[i] = '*'; } } public char[] read() throws InterruptedException { lock.readLock(); try { return doRead(); } finally { // 释放用于读取的锁 lock.readUnlock(); } } public void write(char c) throws InterruptedException { lock.writeLock(); try { doWrite(c); } finally { // 确保一定会调用 采用 before/after模式 lock.writeUnlock(); } } // 用于创建一个新的 char 数组,来复制 buffer 字段的内容 ,并返回 newbuf private char[] doRead() { char[] newbuf = new char[buffer.length]; for (int i = 0; i < buffer.length; i++) { newbuf[i] = buffer[i]; } slowly(); return newbuf; } // 用于执行实际的写入操作。该方法会以参数传入的字符c来填满 buffer字段。"以传入的字符填满数组" 这个操作并没有什么特别的意义, 只是为了让运行结果容易理解而已。注意: slowly(), 这里假定了 写入操作的时间比读取操作的时间长 private void doWrite(char c) { for (int i = 0; i < buffer.length; i++) { buffer[i] = c; slowly(); } } private void slowly() { try { Thread.sleep(50); } catch (InterruptedException e) { } } }
WriteThread
package code.rocky.readWriteLock; import java.util.Random; public class WriterThread extends Thread { private static final Random random = new Random(); private final Data data; private final String filler; private int index = 0; public WriterThread(Data data, String filler) { this.data = data; this.filler = filler; } public void run() { try { while (true) { char c = nextchar(); data.write(c); // 0~3000毫秒随机休眠 Thread.sleep(random.nextInt(3000)); } } catch (InterruptedException e) { } } // 用于获取下一次应该写入的字符 private char nextchar() { char c = filler.charAt(index); index++; if (index >= filler.length()) { index = 0; } return c; } }
ReaderThread
package code.rocky.readWriteLock; public class ReaderThread extends Thread { private final Data data; public ReaderThread(Data data) { this.data = data; } public void run() { try { long begin = System.currentTimeMillis(); for (int i = 0; i < 20; i++) { char[] readBuf = data.read(); System.out.println(Thread.currentThread().getName() + " reads " + String.valueOf(readBuf)); } long time = System.currentTimeMillis() - begin; System.out.println(Thread.currentThread().getName() + ": time = " + time); } catch (InterruptedException e) { } } }
ReadWriteLock
package code.rocky.readWriteLock; public final class ReadWriteLock { private int readingReaders = 0; // (A)…实际正在读取中的线程个数 private int waitingWriters = 0; // (B)…正在等待写入的线程个数 private int writingWriters = 0; // (C)…实际正在写入中的线程个数 private boolean preferWriter = true; // 若写入优先,则为true public synchronized void readLock() throws InterruptedException { while (writingWriters > 0 || (preferWriter && waitingWriters > 0)) { wait(); } readingReaders++; // (A) 实际正在读取的线程个数加1 } public synchronized void readUnlock() { readingReaders--; // (A) 实际正在读取的线程个数减1 preferWriter = true; notifyAll(); } public synchronized void writeLock() throws InterruptedException { waitingWriters++; // (B) 正在等待写入的线程个数加1 try { while (readingReaders > 0 || writingWriters > 0) { wait(); } } finally { waitingWriters--; // (B) 正在等待写入的线程个数减1 } writingWriters++; // (C) 实际正在写入的线程个数加1 } public synchronized void writeUnlock() { writingWriters--; // (C) 实际正在写入的线程个数减1 preferWriter = false; notifyAll(); } }
Reader
(读者): Reader
角色对 SharedResource
角色执行 read 操作。Writer
(写者): Writer
角色对 SharedResource
角色执行 write 操作。SharedResource
(共享资源) : SharedResource
角色表示的是 Reader
角色和 Write
角色二者共享的资源。SharedResource
角色提供不修改内部状态的操作(read) 和修改内部状态的操作(write)。ReadWriterLock
(读写锁) : ReadWriterLock
角色提供了 SharedResource
角色实现 read 操作和 write操作所需的锁。利用 "读取"操作的线程之间不会冲突的特性来提高程序性能
Read-Write Lock
模式利用了读取操作的线程之间不会冲突的特性。由于读取操作不会修改 SharedResource
角色的状态, 所以彼此之间无需执行互斥处理。因此多个 Reader
角色可以同时执行 read
操作, 从而提高程序性能。
性能的提升还需要考虑 “适合读取操作繁重时” 和 “适合读取频率比写入频率高时” 两个大方针。
适合读取操作繁重时
在单纯使用 Single Threaded Execution
模式的情况下, 就算是 read操作, 每次也只能运行一个线程。如果 read 的操作很繁重, 那么使用 Read-Write Lock模式比使用 Single Threaded Execution
模式更加合适
但是, 因为Read-Write Lock模式的处理比 Single Thread Execution
模式复杂, 所以当 read 的操作很简单(不耗费时间)时, Single Thread Execution
模式反而会更加合适。
适合读取频率比写入频率高时
Read-Write Lock模式的优点是 Reader 角色之间不会发生冲突, 但是, 如果写入处理(write) 的频率很高, write 角色便会频繁停止 Reader角色的处理, 这样就无法体现出 Read-Write Lock 模式的优点了。
锁的含义
synchronized
可以用于获取实例的锁。Java的每个实例都持有一个"锁", 但同一个锁不可以由两个以上的线程同时获取。这种结构是 Java 编程规范规定的, Java虚拟机也是这样实现的。这也是Java语言一开始就提供的所谓的物理锁, Java程序并不能改变这种锁的运行。synchronized
获取的锁是不一样的。这并不是 Java编程规范规定的结构, 而是开发人员自己实现的一种结构。逻辑锁Thread-Per-Message 模式是指每个message一个线程,message可以理解成“消息”、“命令”或者“请求”。
每一个message都会分配一个线程,由这个线程执行工作,使用Thread-Per-Message Pattern时,“委托消息的一端”与“执行消息的一端”回会是不同的线程。
Helper
public class Helper { public void handle(int count, char c) { System.out.println(" handle(" + count + ", " + c + ") BEGIN"); for (int i = 0; i < count; i++) { slowly(); System.out.print(c); } System.out.println(""); System.out.println(" handle(" + count + ", " + c + ") END"); } private void slowly() { try { Thread.sleep(100); } catch (InterruptedException e) { } } }
Host
public class Host {
private final Helper helper = new Helper();
public void request(final int count, final char c) {
System.out.println(" request(" + count + ", " + c + ") BEGIN");
helper.handle(count, c);
System.out.println(" request(" + count + ", " + c + ") END");
}
}
HostTest
public class HostTest {
public static void main(String[] args) {
System.out.println("main BEGIN");
Host host = new Host();
host.request(10, 'A');
host.request(20, 'B');
host.request(30, 'C');
System.out.println("main END");
}
}
Helper
角色来处理请求。提高响应性, 缩短延迟时间
Thread-Per-Message 模式能够提高与 Client 角色 对应的 Host角色的响应性, 降低延迟时间。尤其是当 handle 操作非常耗时, 或者 handle 操作需要等待 输入/输出时, 效果非常明显。
在 Thread-Per-Message 模式下, Host 角色会启动新的线程。由于启动线程也会花费时间, 所以想要提高响应性时, 是否使用 Thread-Per-Message 模型 取决于 “handle操作花费的时间” 和 “线程启动花费的时间” 之间的均衡。
为了缩短线程启动花费的时间, 我们可以使用 worker Thread模式
适用于操作顺序没有要求时
在 Thread-Per-Message模式中, handle 方法并不一定是按 request 方法的调用顺序来执行的。
适用于不需要返回值时
在 Thread-Per-Message 模式中, request 方法并不会等待 handle 方法执行结束。所以 request 得不到 handle 的运行结果。因此, Thread-Per-Message模式适用于不需要获取返回值的情况。
应用于服务器
为了使服务器可以处理多个请求, 我们可以使用 Thread-Per-Message模式。服务器本身的线程接收客户端的请求, 而这些请求的实际处理则交由其他线程来执行, 服务器本身的线程则返回, 去等待客户端的其他请求。
调用方法 + 启动线程 —> 发送消息
通常情况下, 当调用方法时, 该方法中的所有处理都被执行完之后, 控制权才会返回。Thread-Per-Message 模式的 request 方法也是一个普通方法, 所以当该方法中的处理被执行完毕后, 控制权就会返回。
“request 方法真正想要的操作(显示字符串)执行了吗?” 虽然控制权从 request 返回了, 但这并等于显示 字符串的操作也就执行完了。虽然 request 触发了目标操作的开始(触发器), 但并不等待处理结束。
这是一个来自工作车间的故事, 在这里, 工人们负责组装塑料模型。
客人会将很多装有塑料模型的箱子带到工作车间来, 然后摆放在桌子上。
工人必须将客户送过来的塑料模型一个一个组装起来。他们会先取回放在桌子上的装有塑料模型的箱子, 然后在阅读了箱子中的说明书后开始组装。当一箱模型组装完成后, 工人们会继续去取下一个箱子。当所有模型全部组装完成后, 工人们会等待新的模型被送过来。
也称为 Thread Pool(线程池) 模式
ClientThread
类的线程会向Channel
类发送工作请求(委托) (说是工作, 其实只是显示出委托者的名字和委托编号)
Channel
类的实例雇佣了五个工人线程(WorkerThread
) 进行工作。所有工人线程都在等待工作请求的到来。工作请求到来后, 工人线程会从
Channel
那里获取一项工作请求并开始工作。工作完成后, 工人线程会回到Channel
那里等待下一项工作请求。
Channel
public class Channel { private static final int MAX_REQUEST = 100; private final Request[] requestQueue; private int tail; // 下次putRequest的位置 private int head; // 下次takeRequest的位置 private int count; // Request的数量 private final WorkerThread[] threadPool; public Channel(int threads) { this.requestQueue = new Request[MAX_REQUEST]; this.head = 0; this.tail = 0; this.count = 0; threadPool = new WorkerThread[threads]; for (int i = 0; i < threadPool.length; i++) { threadPool[i] = new WorkerThread("Worker-" + i, this); } } public void startWorkers() { for (int i = 0; i < threadPool.length; i++) { threadPool[i].start(); } } public synchronized void putRequest(Request request) { while (count >= requestQueue.length) { try { wait(); } catch (InterruptedException e) { } } requestQueue[tail] = request; tail = (tail + 1) % requestQueue.length; count++; notifyAll(); } public synchronized Request takeRequest() { while (count <= 0) { try { wait(); } catch (InterruptedException e) { } } Request request = requestQueue[head]; head = (head + 1) % requestQueue.length; count--; notifyAll(); return request; } }
Request
public class Request { private final String name; private final int number; public Request(String name, int number) { this.name = name; this.number = number; } public void execute() { System.out.println(Thread.currentThread().getName() + " executes " + this); } public String toString() { return "[ Request from " + name + " No." + number + " ]"; } }
ClientThread
public class ClientThread extends Thread {
private final Channel channel;
public ClientThread(String name, Channel channel) {
super(name);
this.channel = channel;
}
public void run() {
for (int i = 0; true; i++) {
Request request = new Request(getName(), i);
channel.putRequest(request);
}
}
}
WorkerThread
public class WorkerThread extends Thread {
private final Channel channel;
public WorkerThread(String name, Channel channel) {
super(name);
this.channel = channel;
}
public void run() {
while (true) {
Request request = channel.takeRequest();
request.execute();
}
}
}
ChannelTest
public class ChannelTest { public static void main(String[] args) { Channel channel = new Channel(5); // 工人线程的个数 channel.startWorkers(); new ClientThread("Alice", channel).start(); new ClientThread("Bobby", channel).start(); new ClientThread("Chris", channel).start(); try { Thread.sleep(30000); } catch (InterruptedException e) { } System.exit(0); } }
Channel
角色。Channel
角色接收来自于 Client
角色的 Request
角色, 并将其传递给 Worker
角色。Worker
角色从 Channel
角色, 并进行工作。当一项工作完成后, 它会继续去获取另外的 Request
角色。Request
角色是表示工作的角色, 并进行工作。当一项工作完成后, 它会继续去获取另外的 Request
角色。提高吞吐量
如果可以将自己的工作交给其他人, 那么自己就可以做下一项工作。线程也是一样的。如果将工作交给其他线程, 自己就可以做下一项工作(Thread-Per-Message)。
由于启动新线程需要花费时间, 所以 Worker Thread 模式的主题之一就是通过轮流地和反复地使用线程来提高吞吐量。
容量控制
Worker Thread模式还有另外一个主题, 那就是可以同时提供的服务的数量, 即容量的控制。
Worker 角色的数量
Worker角色的数量是可以自由地定义的。在示例程序中, 传递给 Channel
的构造函数的参数 threads 即表示这个数值。Worker 角色会创建 threads 个 WorkerThread
的实例。
Worker角色的数量越多, 可以并发进行的处理也越多。但是, 即使 Worker角色的数量超过了 同时被请求的工作数量,也不会对提高程序处理效率有什么帮助。因为多余的 Worker
角色不但不会工作, 还会占有内存。增加容量就会增加消耗的资源, 所以必须根据程序实际运行的环境来相应地调整 Worker
角色的数量。
Worker
角色Worker
角色增加)时, 就要逐渐减少 Worker
角色Request 角色的数量
Channel
角色中保存着 Request
角色。 只要Worker
角色不断地进行工作, 在 Channel
角色中保存的 Request
角色就不会增加很多。不过, 当接收到的工作的数量超出了 Worker
角色的处理能力后, Channel
角色中就会积累很多 Request
角色。这时, Client
角色必须等待一段时间才能将 Request
角色发送给 Channel
角色。
如果 Channel
角色可以保存很多 Request
角色, 那么就可以填补(缓冲) Client
角色与 Worker
角色之间的处理速度差异。
调用与执行的分离
Client
角色负责发送工作请求。它会将工作内容封装为 Request
角色, 然后传递给 Channel
角色。在普通的方法调用中, 这部分相当于 “设置参数并调用方法”。其中, “设置参数” 与 "创建 Request
角色"相对应, 而 “传递给 Channel
角色” 大致与 "调用方法"相对应。
Worker
角色负责进行工作。它使用从 Channel
角色接收到的 Request
角色来执行实际的处理。在普通的方法调用中, 这部分相当于 “执行方法”。
在进行普通的方法调用时, “调用方法” 和 “执行方法” 是连续进行的。因为调用方法后, 方法会立即执行。在普通的方法调用中, 调用与执行是无法分开的。
在 Worker Thread
模式 和 Thread-Per-Message
模式中, 方法的调用和方法的执行是特意被分开的。方法的调用被称为 invocation
,方法的执行被称为 execution
。因此, 可以说 Worker Thread
模式和 Thread-Per-Message
模式将方法的调用和执行 分离开了。调用与执行的分离同时也是 Command
模式。
提高响应速度
如果调用和执行不可分离, 那么当执行需要花费很长时间时, 就会拖调用处理的后腿。但是如果将调用和执行分离, 那么即使执行需要花费很长时间也没有什么关系, 因为执行完调用处理的一方可以先继续执行其他处理, 这样就可以提高响应速度。
控制执行顺序 (调度)
如果调用和执行不可分离, 那么在调用后就必须开始执行。
但是如果将调用和执行分离, 执行就可以不再受调用顺序的制约。我们可以通过设置 Request
角色的优先级, 并控制 Channel
角色 将 Request
角色传递给Worker
角色的顺序来实现上述处理。
可以取消和反复执行
将调用和执行分离后, 还可以实现 “即使调用了也可以取消执行” 这种功能。
由于调用的结果是 Request
角色对象, 所以既可以将 Request
角色保存, 又可以反复地执行。
通往分布式之路
将调用和执行分离后, 可以将负责调用的计算机与负责执行的计算机分离开来, 然后通过网络将扮演 Request
角色的对象从一台计算机传递至另外一台计算机。
Runnable 接口的意义
java.lang.Runnable
接口有时会被用作 Worker Thread
模式中的 Request
角色。也就是说, 该模式会创建一个实现了 Runnable
接口的类的实例对象(Runnable
对象) 来表示工作内容, 然后将它传递给 Channel
角色,让其完成这项工作。
多态 Request 角色
在示例程序中, ClientThread
传递给 Channel
的只是 Request
的实例。但是, WorkThread
并不知道 Request
类的详细信息。WorkerThread
只是单纯的接收 Request
的实例, 然后调用它的 execute
方法而已。
也就是说 , 即使我们编写了一个 Request
类的子类并将它的实例传递给了 Channel
, WorkThread
也可以正常地调用 execute
方法。用面向对象的术语来说, 就是这里使用了多态性。
Request
类是表示工作的类, 编写 Request
类的子类相当于增加工作的种类。我们就实现了具有多态的 Request
角色。
Request
角色中包含了完成工作所必需的全部信息。因此, 即使我们实现了多态的 Request
角色并增加了工作的种类, 也无需修改 Channel
角色和 Worker
角色。这是因为即使工作种类增加了, worker
角色依然只是调用 execute
方法而已。
独自一人的 Worker 角色
请大家想象一下工人线程(Worker
角色只有一个的情况。)当工人线程只有一个时, 由于工人线程进行处理的范围变成了单线程, 所以会有互斥处理可以省略的可能性。
Future 的意思是 未来、期货。假设有一个方法需要花费很长时间才能获取运行结果。那么, 与其一直等待结果, 不如先拿一张 “提货单”。获取提货单并不耗费时间。这里的 “提货单” 我们称为
Future
角色。获取
Future
角色的线程会在稍后使用Future
角色来获取运行结果。这与凭着提货单去取蛋糕非常相似。如果运行结果已经出来了,那么直接领取即可; 如果运行结果还没有出来, 那么需要等待结果出来。
Data
public interface Data {
public abstract String getContent();
}
FutureData
public class FutureData extends FutureTask<RealData> implements Data { public FutureData(Callable<RealData> callable) { super(callable); } public String getContent() { String string = null; try { string = get().getContent(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return string; } }
Host
public class Host { public FutureData request(final int count, final char c) { System.out.println(" request(" + count + ", " + c + ") BEGIN"); // (1) 创建FutureData的实例 // (向构造函数中传递 Callable<RealData>) FutureData future = new FutureData( new Callable<RealData>() { public RealData call() { return new RealData(count, c); } } ); // (2) 启动一个新线程,用于创建RealData的实例 new Thread(future).start(); System.out.println(" request(" + count + ", " + c + ") END"); // (3) 返回FutureData的实例 return future; } }
RealData
public class RealData implements Data { private final String content; public RealData(int count, char c) { System.out.println(" making RealData(" + count + ", " + c + ") BEGIN"); char[] buffer = new char[count]; for (int i = 0; i < count; i++) { buffer[i] = c; try { Thread.sleep(100); } catch (InterruptedException e) { } } System.out.println(" making RealData(" + count + ", " + c + ") END"); this.content = new String(buffer); } public String getContent() { return content; } }
HostTest
public class HostTest { public static void main(String[] args) { System.out.println("main BEGIN"); Host host = new Host(); Data data1 = host.request(10, 'A'); Data data2 = host.request(20, 'B'); Data data3 = host.request(30, 'C'); System.out.println("main otherJob BEGIN"); try { Thread.sleep(2000); } catch (InterruptedException e) { } System.out.println("main otherJob END"); System.out.println("data1 = " + data1.getContent()); System.out.println("data2 = " + data2.getContent()); System.out.println("data3 = " + data3.getContent()); System.out.println("main END"); } }
Client
: Client
角色向 Host
角色发出请求(request) , 并会立即接收到请求的处理结果(返回值) —VirtualData
角色Host
: Host角色会创建新的线程, 并开始在新线程中创建 RealData
角色。同时, 它会将 Future
角色返回给 Client
角色。VirtualData
(虚拟数据) : VirtualData
角色是让 Future
角色与 RealData
角色具有一致性的角色。RealData
(真实数据): 表示真实数据的角色。创建该对象需要花费很多时间。Future
: Future
角色是 RealData
角色的 “提货单”, 由 Host
角色传递给 Client
角色。Future
角色就是 VirtualData
角色。实际上, 当 Client
角色操作Future
角色时, 线程会调用 wait
方法等待, 直至 RealData
角色创建完成。但是, 一旦 RealData
角色创建完成, 线程就不会再继续等待。Future
角色会将 Client
角色的操作委托给 RealData
角色。小孩子在玩玩具时经常会将玩具弄得满房间都是。晚上到了睡觉时间, 妈妈就会对小孩子说: “先收拾房间再睡觉噢”, 这时, 小孩子会开始打扫房间。
它是一种先执行完终止处理再终止线程的模式。
CountUpThread
package code.rocky.twoPhaseTermination; public class CountUpThread extends Thread { // 计数值 private long counter = 0; // 终止请求 public void shutdownRequest() { interrupt(); } // 线程体 public void run() { try { while (!isInterrupted()) { doWork(); } } catch (InterruptedException e) { } finally { doShutdown(); } } // 操作 private void doWork() throws InterruptedException { counter++; System.out.println("doWork: counter = " + counter); try { Thread.sleep(500); } catch (InterruptedException e) { } } // 终止处理 private void doShutdown() { System.out.println("doShutdown: counter = " + counter); } }
CountUpThreadTest
package code.rocky.twoPhaseTermination; public class CountUpThreadTest { public static void main(String[] args) { System.out.println("main: BEGIN"); try { // 启动线程 CountUpThread t = new CountUpThread(); t.start(); // 稍微间隔一段时间 Thread.sleep(10000); // 线程的终止请求 System.out.println("main: shutdownRequest"); t.shutdownRequest(); System.out.println("main: join"); // 等待线程终止 t.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main: END"); } }
TerminationRequester
(终止请求发出者) : TerminationRequester
角色负责向 Terminator
角色发出终止请求。Terminator
(终止者) : Terminator
角色负责接收终止请求, 并实际执行终止处理。它提供了表示终止请求的 shutdownRequest
方法不需要使用 Single Threaded Execution
不能使用 Thread
类的 stop 方法
java.lang.Thread
类提供了用于强制终止线程的 stop 方法。但是 stop 是 “不推荐使用的方法”,我们不应当使用它。
因为如果使用 stop 方法, 实例的安全性就无法确保。使用 stop 方法后, 线程会在抛出 java.lang.ThreadDeath
异常后终止。即使该线程正处于访问临界区的过程中也会终止。
仅仅检查标志是不够的
因为当想要终止线程时, 该线程可能正在 sleep。而当线程正在 sleep 时, 即使将 shutdownRequest
标志设置为 true, 线程也不会开始终止处理。等到 sleep 时间过后, 线程可能会在某个时间点开始终止处理, 但是这样程序的响应性就下降了。如果使用 interrupt
方法的话, 就可以中断 sleep。
另外, 线程当时也可能正在 wait。而当线程正在 wait 时, 即使将 shutdownRequested
标志设为true, 线程也不会从等待队列中出来, 所以我们必须使用 interrupt方法对线程下达 中断wait
的指标。
仅仅检查中断状态是不够的
调用interrupt方法后,如果线程正在 sleep或是 wait, 那么会抛出 InterruptedExecption
异常,而如果不抛出异常, 线程就会变为中断状态。也就是说, 没有必要特意准备一个新的 shutdownRequested
标志。只要捕获 InterruptedException
, 使用 isInterrupted
方法来检查线程是否处于中断状态不就可以了么
在长时间处理前检查终止请求
为了能够在接受到终止请求后立即开始终止处理, 我们应当在执行长时间处理前检查 shutdownRequested
标志或是调用 isShutdownRequested
方法。
join
方法 和 isAlive
方法
isAlive
方法来确定指定的线程是否已经终止。如果返回值是true, 则表示该线程还活着; 如果返回值是false, 则表示该线程已经终止。使用 java.lang.Thread
的getState
方法也可以获取线程的状态, 不过如果只是检查线程是否已经终止, 使用 isAlive
会更好。
java.util.concurrent.ExecutorService
接口 与 Two-Phase Termination
模式
要捕获程序整体的终止时
优雅地终止线程
“线程优雅地执行终止处理, 然后终止运行” 这种状态用英语单词来形容的话, 就是Graceful(优雅的、高贵的、得体的)
。这种状态相当于工作的结束并不是慌慌张张地放下已经着手的工作不管, 而是在进行必要的整理后才正式终止。
安全地终止(安全性)
即使接收到终止请求, 线程也不会立即终止。首先表示是否已经接收到终止请求的 shutdownRequested
标志会被设置为 true
。然后, 仅在线程运行至不会破坏对象安全性的位置时, 程序才会开始终止处理。
必定会进行终止处理(生存性)
线程在接收到终止请求后, 会中断可以中断的wait, 转入终止处理。为此, shutdownRequest
方法会调用 interrupt
方法。
另外, 为了确保在抛出异常后程序也会执行终止处理, 我们使用了 try… finally 语句块。
发出终止请求后尽快进入终止线程(响应性)
线程在接收到终止请求后, 会中断可以中断的sleep, 尽快进入终止处理。为此, shutdownRequest
方法会调用 interrupt 方法。
另外, 在执行长时间处理前需要检查 shutdownRequested
标志。
就是“线程独有的存储库”,该模式会对每个线程提供独有的内存空间。
java.lang.ThreadLocal
类提供了该模式的实现,ThreadLocal
的实例是一种集合(collection)架构,该实例管理了很多对象,可以想象成一个保管有大量保险箱的房间。
ThreadLocal
简介见 我的 ThreadLocal
解读
Log
public class Log { private static PrintWriter writer = null; // 初始化writer字段 static { try { writer = new PrintWriter(new FileWriter("log.txt")); } catch (IOException e) { e.printStackTrace(); } } // 写日志 public static void println(String s) { writer.println(s); } // 关闭日志 public static void close() { writer.println("==== End of log ===="); writer.close(); } }
LogTest
public class LogTest {
public static void main(String[] args) {
System.out.println("BEGIN");
for (int i = 0; i < 10; i++) {
Log.println("main: i = " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
Log.close();
System.out.println("END");
}
}
ClientThread
public class ClientThread extends Thread { public ClientThread(String name) { super(name); } public void run() { System.out.println(getName() + " BEGIN"); for (int i = 0; i < 10; i++) { Log.println("i = " + i); try { Thread.sleep(100); } catch (InterruptedException e) { } } Log.close(); System.out.println(getName() + " END"); } }
Log
public class Log { private static final ThreadLocal<TSLog> tsLogCollection = new ThreadLocal<TSLog>(); // 写日志 public static void println(String s) { getTSLog().println(s); } // 关闭日志 public static void close() { getTSLog().close(); } // 获取线程特有的日志 private static TSLog getTSLog() { TSLog tsLog = tsLogCollection.get(); // 如果该线程是第一次调用本方法,就新生成并注册一个日志 if (tsLog == null) { tsLog = new TSLog(Thread.currentThread().getName() + "-log.txt"); tsLogCollection.set(tsLog); } return tsLog; } }
TSLog
public class TSLog { private PrintWriter writer = null; // 初始化writer字段 public TSLog(String filename) { try { writer = new PrintWriter(new FileWriter(filename)); } catch (IOException e) { e.printStackTrace(); } } // 写日志 public void println(String s) { writer.println(s); } // 关闭日志 public void close() { writer.println("==== End of log ===="); writer.close(); } }
ClientThreadTest
public class ClientThreadTest {
public static void main(String[] args) {
new ClientThread("Alice").start();
new ClientThread("Bobby").start();
new ClientThread("Chris").start();
}
}
Client
(委托者) : Client
角色将处理委托给TSObjectProxy
角色。一个 TSObjectProxy
角色会被多个Client
角色使用。TSObjectProxy
(线程特有的对象的代理人): TSObjectProxy
角色会执行多个Client
角色委托给它的处理。首先, TSObjectProxy
角色使用 TSObjectCollection
角色获取与Client
角色对应的 TSObjet
角色。TSObjectCollection
(线程特有的对象的集合):TSObjectCollection
角色有一张Client
角色与TSObject
角色之间的对应表。 当getTSObject
方法被调用后, 它会去查看对应表, 返回与 Client
角色相对应的 TSObject
角色。另外, 当 setTSObject
方法被调用后, 它会将 Client
角色与 TSObject
角色之间的键值对应关系设置到对应表中。TSObject
(线程特有对象): TSObject
角色中保存着线程特有的信息。TSObject
角色由TSObjectCollection
角色管理。TSObject
角色的方法只会被单线程调用。局部变量与 java.lang.ThreadLocal
类
线程本来都是有自己特有的存储空间的, 即用于保存方法的局部变量的栈。方法中定义的局部变量属于该线程特有, 其他线程无法访问它们。但是,这些变量在方法调用结束后就会消失。而ThreadLocal
则与方法调用无关,它是一个用于为线程分配特有的存储空间的类。
保存线程特有的信息的位置
线程特有的信息的"保存位置"有以下两种。
在线程外保存线程特有的信息
TSLOG
的实例都被保存在Log
类中的java.lang.ThreadLocal
的实例中。
TreadLocal
的实例就是存储间, 各个线程的储物间内。线程并不会背着储物柜四处走动。像这样, 将线程持有的信息保存在线程外部的方法称为 “线程外”。
在线程内保存线程特有的信息
编写一个 Thread
类的子类 – MyThread
。如果在 MyThread
中声明字段, 该字段就是线程特有的信息。这就是线程内保存线程特有的信息。
不必担心其他线程访问
Thread-Specific Storage
是"线程特有的存储"的意思。
不会被其他线程随意访问 这一特性非常重要。这是因为,在多线程编程中,互斥处理非常重要,但是优雅地执行互斥处理却非常困难。
Thread-Specific Storage
模式为我们提供了一种以线程作为键,让每个线程只能访问它特有的对象的机制。该对象是以线程为单位来保存的,绝对不用担心其他线程会访问该对象。
吞吐量的提高很大程度上取决于实现方式
Thread-Specific Storage
模式并没有执行互斥处理。因此,这很容易让人误解为与使用 Single Threaded Execution模式相比, 此时的吞吐量会有所提高。但是, 事实并非一定如此。原因如上文所述, 可能 TSObjectCollection
角色中执行了隐藏的互斥处理。此外, 每次通过 TSObjectProxy
角色调用方法时, 使用 TSObjectCollection
角色都会产生额外的性能开销。
Thread-Specific Storage
更看重如下所示的可复用性。
上下文的危险性
在 Thread-Specific Storage
模式中, TSObjectCollection
角色会自动判断当前的线程。也就是说, 我们没有必要将线程的相关信息通过参数传递给 TSObjectCollection
角色。这相当于在程序中引入了上下文。上下文虽然很方便,但是也有一定的危险性。因为开发人员看不到处理中所使用的信息。
基于角色的方式即在表示线程的实例中保存进行工作所必需的信息(上下文、状态)。这样可以减少和减轻线程之间的交互信息量。一个线程会使用从其他线程接收到的信息来执行处理, 改变自己的内部状态。通常,我们称这样的线程为角色。
基于任务的方式不在线程中保存信息(上下文、状态)。在这种方式下, 这些信息不保存在线程中,而是保存在线程之间交互的实例中。而且, 不仅是数据, 连用于执行请求的方法都定义在其中。像这样在线程之间交互的实例可以称为消息、请求或是命令。这里我们暂且称其为任务。
Active 是 “主动的"的意思, 因此 Active Object 就是"主动对象"的意思。所谓"主动的”, 一般指 “有自己特有的线程”。因此,举例来说,java 的
java.lang.Thread
类的实例就是一种主动对象。不过,在 Active Object 模式中出场的主动对象可不仅仅 “有自己特有的线程”。它同时还具有可以从外部接收和处理异步消息并根据需要返回处理结果的特征。
Active Object 模式中的主动对象会通过自己特有的线程在合适的时机处理从外部接收到的异步消息。
见code-multithread-pattern
Client
(委托者) : Client
角色调用 ActiveObject
角色的方法来委托处理。它能够调用的只有 ActiveObject
角色提供的方法。调用这些方法, (如果 ActivationQueue
角色没有满) 程序控制权会立即返回。
虽然 Client
角色只知道 ActiveObject
角色,但它实际调用的是Proxy
角色。
Client
角色在获取处理结果时, 会调用 VirtualResult
角色的 getResultValue
方法。这里使用了 Future
模式。
ActiveObject
(主动对象) : Active Object角色定义了主动对象向Client角色提供的接口(API )。
ActiveObject
角色定义了主动对象向 Client
角色提供的接口(API)。
Proxy
(代理人) : Proxy
角色负责将方法调用转换为 MethodRequest
角色的对象。转换后的 MethodRequest
角色会被传递给 Scheduler
角色。
Proxy
角色实现了 ActiveObject
角色提供的接口(API)。
调用 Proxy
角色的方法的就是 Client
角色。将方法调用转换为 MethodRequest
角色, 并传递给 Scheduler
角色的操作都是使用 Client
角色的线程进行的。
Scheduler
: Scheduler
角色负责将 Proxy
角色传递来的 MethodRequest
角色传递给 ActivationQueue
角色, 以及从 ActivationQueue
角色取出并执行 MethodRequest
角色这两项工作。
Client
角色的线程负责将 MethodRequest
传递给 ActivationQueue
角色。
而从ActivationQueue角色取出并执行MethodRequest角色这项工作则是使用Scheduler角色自己的线程进行的。在Active Object模式中,只有使用 Client 角色和 Scheduler 角色时才会启动新线程。
Scheduler角色会把 MethodRequest角色放入 ActivationQueue 角色或者从ActivationQueue角色取出 MethodRequest 角色。
因此,Scheduler 角色可以判断下次要执行哪个请求。如果想实现请求调度的判断逻辑,可以将它们实现在Scheduler角色中。也正是因为如此,我们才将其命名为Scheduler。
MethodRequest
MethodRequest角色是与来自Client角色的请求对应的角色。MethodRequest定义了负责执行处理的 Servant角色,以及负责设置返回值的Future角色和负责执行请求的方法(everto)
MethodRequest角色为主动对象的接口(API)赋予了对象的表象形式。
ContreteMethodRequest
ConcreteMethodRequest角色是使MethodRequest角色与具体的方法相对应的角色。Active Object角色中定义的每个方法,会有各个类与之对应,比如MethodAlphaRequest。
Servant(仆人)
Servant角色负责实际地处理请求。
调用Servant角色的是Scheduler角色的线程。Scheduler角色会从ActivationQueue角色取出一个MethodRequest角色(实际上是ConcreteMethodRequest角色)并执行它。此时,Scheduler 角色调用的就是Servant角色的方法。
Servant角色实现了ActiveObject角色定义的接口(API )。
Proxy角色会将请求转换为MethodRequest角色,而Servant角色则会实际地执行该请求。Scheduler角色介于Proxy角色和Servant 角色之间,负责管理按照什么顺序执行请求。
ActivationQueue (主动队列)
ActivationQueue角色是保存MethodRequest角色的类。
调用putRequest方法的是Client角色的线程,而调用takeRequest方法的是Scheduler角色的线程。这里使用了Producer-Consumer模式。
VirtualResult(虚拟结果)
VirtualResult角色与Future角色、RealResult角色共同构成了Future模式。
Client角色在获取处理结果时会调用VirtualResult角色(实际上是Future角色)的getResultvalue方法。
Future(期货)
Future角色是 Client角色在获取处理结果时实际调用的角色。当处理结果还没有出来的时候,它会使用Guarded Suspension模式让 Client角色的线程等待结果出来。
RealResult(真实结果)
RealResult角色是表示处理结果的角色。Servant角色会创建一个RealResult角色作为处理结果,然后调用Future角色的setRealResult方法将其设置到Future角色中。
到底做了些什么事情
运用模式时需要考虑问题的粒度
Active Object模式的组成要素众多,是一个非常庞大的模式。因此,在运用该模式时,必须注意问题的粒度。所谓问题的粒度,是指问题的大小,也就是用于解决问题的每个处理到底有多大。
关于并发性
Proxy 角色即使被多个线程调用也没有问题({ concurrent } )
Servant角色只能被一个线程调用( { sequential } )
增加方法
Scheduler 角色的作用
在[POSA2]中,Scheduler 角色如下。首先,各ConcreteMethodRequest 角色会定义guard方法。接着,如果可以执行ConcreteMethodRequest 角色,就让guard方法返回true。仅当guard方法的返回值是true时,Scheduler角色才会调用ConcreteMethodRequest角色的execute方法”。这样,ConcreteMethodRequest角色的守护条件就可以整合在它们各自的方法中了。.
主动对象之间的交互
可以编写多个主动对象,然后让它们之间互相交互。也就是说,Servant角色会调用其他 ActiveObject 角色的方法。
通往分布式-从跨越线程界线变为跨越计算机界线
这个方法运行于哪个线程呢? 在Active Object模式中,“方法的调用”的部分运行于Client角色的线程中,“方法的执行”部分运行于Scheduler角色的线程中。
这里其实也是“调用与执行的分离”: 执行invocation的线程(Client角色)与执行execution 的线程(Scheduler角色)被分离开了。
如果将线程分离开来,那么就可以很容易地将线程运行于的计算机也分离开来,即将执invocation的机器与执行execution的计算机分离开,然后用网络将它们连接起来。那么网络之间互相传输的是什么呢?对,就是 MethodRequest角色和Result角色。
由于方法的调用和设置返回值都已经被转换为了对象这种“有形的东西”,所以可以通过网络交互。这可以说是“从跨越线程界线变为了跨越计算机界线”。
http: / / docs.oracle.com/javase/8/docs/technotes/qguides/rmi/
看到的一些其他的多线程设计模式, 罗列在下面。欢迎小伙伴们反馈噢~
Promise (承诺)模式
Serial Thread Confinement(串行线程封闭) 模式
Master-Slave(主仆) 模式
Pipeline(流水线) 模式
Half-sync/ Half-async(半同步/半异步)模式
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。