赞
踩
即某个正在执行的操作不能中断,不能被分割,要么就不执行,要么就执行完毕。
正如我们所知道的synchronized代码块,这块里面的东西要么就是被执行完毕,要么就不执行。
还有就是我们所熟知的基本数据类型的读写,long 和 double除外,也都是原子性的。
当某个线程修改了共享变量的值,其他线程能够立刻得知这个修改。但是根本原因在于:
多个线程之间是不能互相传递数据通信的,他们之间沟通只能通过共享变量来进行。Java内存模型规定了jvm有主内存,主内存是多个线程共享的,当new一个对象的时候,也是被分配在主内存中,每个线程都有自己独立的 工作内存,工作内存存储了主存的某些对象的副本,所以单个线程与线程的工作内存之间就有了相互隔离的效果,这就是“可见性问题”
线程在引用变量时不能从主存中直接引用,如果线程工作内存中没有该变量,则会从主内存中拷贝一个副本到工作内存中,这个过程为read-load,完成后线程会引用该副本,当同一线程再度引用该字段时,有可能重新从主存中获取变量副本(read-load-use),也可能直接引用原来的副本,也就是说read/load/use的顺序可以有JVM实现系统决定。这个时候线程与线程之间的操作的先后顺序,会决定程序对主存区最后的修改是不是正确的,这就是“时序性问题”。
Atomic是 java.util.concurrent包下的,专门为线程安全而设计的,基本特性就是在多线程环境下,当有多个线程同时操作这些方法,具有原子性、排他性。即当某个线程进入某个方法,执行其中的指令,不会被其他线程打断。这个是借助硬件的相关指令的来完成的,不会阻塞线程,只是硬件级别的阻塞。
我们先来看一个例子:计数器(Counter),采用Java里比较方便的锁机制synchronized关键字,代码如下:
classCounter {
private int value;
public synchronized int getValue() {
return value;
}
public synchronized int increment() {
return ++value;
}
public synchronized int decrement() {
return --value;
}
}
如果这样做,满足我们的需求是没有问题的,但有时候我们需要更加效率,更加灵活的方式。Synchronized是基于阻塞的锁机制,也就是说当一个线程拥有锁的时候,访问同一资源的其他线程需要等待,直到该线程释放锁:
这里就有几个问题:
l 如果被阻塞的线程优先级很高,怎么办?
l 获得所得线程一直不释放锁,怎么办?
l 如果有大量的线程来竞争这个资源,效率严重影响,怎么办?
l 死锁了,怎么办?
其实锁机制是一种比较粗糙,粒度比较大的机制,相对于像计数器这样的需求有点儿过于笨重,因此,对于这种需求我们期待一种更合适、更高效的线程安全机制。
做法就是Compareand Swap(CAS)+volatile:
当前的处理器基本都支持CAS,只不过每个厂家所实现的算法并不一样罢了,每一个CAS操作过程都包含三个运算符:一个内存地址V,一个期望的值A和一个新值B,操作的时候如果这个地址上存放的值等于这个期望的值A,则将地址上的值赋为新值B,否则不做任何操作。CAS的基本思路就是,如果这个地址上的值和期望的值相等,则给其赋予新值,否则不做任何事儿,但是要返回原值是多少。
注意:虽然基于CAS的线程安全机制很好很高效,但要说的是,并非所有线程安全都可以用这样的方法来实现,这只适合一些粒度比较小,型如计数器这样的需求用起来才有效,否则也不会有锁的存在了。
JMM规定volatile变量的操作规则,use之前必须readand load,也就是所谓的刷新;
在assign一个值之后,必须storeand write,即必须同步回主内存。以保证其他线程立马可见。
一般使用场景:
2.2.1如果你的代码希望在多线程下,禁止CPU指令重排序,即按照代码顺序执行指令
2.2.2在多线程下,如果这儿有一个关于状态的共享变量,也可以使用volatile,因为他不参与计算,就基本是线程安全的,如果要参与运算,则不能保证是线程安全的
我们访问线程的时候,对于共享可变的数据,通常需要同步;还有一种避免同步的方式,就是不共享,只是在一个线程内访问数据,这种技术成为线程封闭。
2.3.1栈封闭
也就是所谓的方法里面的变量,只要不让他们溢出,也就是不return这个变量给其他调用的地方,那么他就是线程每个线程私有。
2.3.3ThreadLocal
线程共享变量,只是针对每一个线程都有一份这样的变量,不存在其他线程会修改这个值。一般如维持一个全局的数据库连接。
首先,我们必须理解什么是发布,什么是安全发布:
发布:使对象能够在外部代码中使用
public class Nicky {
privateHashMap<String, List<Integer>> map = null;
publicHashMap<String, List<Integer>>getMap() {
return map;
}
public void setMap(HashMap<String, List<Integer>> map) {
this.map = map;
}
public Nicky(){
map = new HashMap<String, List<Integer>>();
}
}
比如 getMap()方法,他是public的,所以能够能被外部调用,而他又返回了一个map对象,那么这个map对象就是属于被发布了。如果被发布的对象,引用了其他的对象,那么其他的对象也将会被发布。
逸出:一个不该发布的对象或者没有准备好的对象被发布了。
public class Escape {
private int data = 0;
private String account = null;
public Escape(){
new Thread(new Test()).start();
new Thread(new Test()).start();
try {
Thread.sleep(10);
} catch(InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
data = 200;
account="nicky";
}
private class Test implements Runnable {
@Override
public void run() {
System.out.println(Escape.this.data);
System.out.println(Escape.this.account);
}
}
public static void main(String[] args) {
newEscape();
}
}
如以上代码所示:在构造Escape实例的时候,先产生了2个线,并且启动了,但是这时候,data 和 account还没有实例化。所以打印出的结果就是0和 null.这就是所谓的逸出。在这里就是Escape还没有构造好,内部类就在使用了。
public class UnsafeStates {
private String[]states = {"AK","AL"};
public String[]getStates() {
return states;
}
public static void main(String[] args) {
UnsafeStates states = new UnsafeStates();
for(int i = 0 ; i < 10;i++){
new Thread(new Test1(states)).start();
new Thread(new Test2(states)).start();
}
}
}
class Test1 implementsRunnable{
privateUnsafeStates unsafeStates;
publicTest1(UnsafeStates unsafeStates) {
this.unsafeStates = unsafeStates;
}
@Override
public void run() {
String[] states = unsafeStates.getStates();
states[0]="BK";
System.out.println(Arrays.toString(states));
}
}
class Test2 implementsRunnable{
privateUnsafeStates unsafeStates;
publicTest2(UnsafeStates unsafeStates) {
this.unsafeStates = unsafeStates;
}
@Override
public void run() {
String[] states = unsafeStates.getStates();
states[0]="CK";
System.out.println(Arrays.toString(states));
}
}
输出结果:
在多线程的环境下,我们把这个数组对象给发布了,但是又没有给他最任何保护措施,肯定会有问题。
那我们应该怎样安全的发布可变对象呢?在发布和使用该对象的线程需要做一些同步。
3.1在静态初始化函数中初始化一个对象引用
private staticEscape escape = new Escape();
同步是容器类都是线程安全的,相对而言。因为在某些时候,比如迭代反复访问元素,直到遍历完容器中的元素以及条件运算。
4.1ConcurrentHashMap
串行:程序顺序执行,按照程序先后顺序执行,比价浪费CPU资源,比如一个服务器每一次只能处理一个请求。
并行:多个程序段或任务能同时执行,互相不影响,能充分利用CPU,也能提升效率。
Code Example:
串行:
public class SerialExecute {
public static void main(String[] args) {
ServerSocket socket = null;
try {
socket = new ServerSocket();
socket.bind(new InetSocketAddress("127.0.0.1", 10005));
while(true){
Socket conn = socket.accept();
handleRequest(conn);
}
} catch(IOException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void handleRequest(Socket socket){
try {
byte[] bytes = new byte[1024];
InputStream inputStream = socket.getInputStream();
while(true){
//读取数据(阻塞)
int read = inputStream.read(bytes);
if(read != -1){
System.out.println(new String(bytes, 0, read));
}else{
break;
}
}
} catch(Exception e){
e.printStackTrace();
}finally{
try {
System.out.println("socket关闭");
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
并发:
public class ParallelExecute {
public static void main(String[] args) {
ServerSocket socket = null;
try {
socket = new ServerSocket();
socket.bind(new InetSocketAddress("127.0.0.1", 10005));
while(true){
final Socket conn = socket.accept();
Runnable target = new Runnable() {
@Override
public void run() {
handleRequest(conn);
}
};
new Thread(target).start();
}
} catch(IOException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void handleRequest(Socket socket){
try {
byte[] bytes = new byte[1024];
InputStream inputStream = socket.getInputStream();
while(true){
//读取数据(阻塞)
int read = inputStream.read(bytes);
if(read != -1){
System.out.println(new String(bytes, 0, read));
}else{
break;
}
}
} catch(Exception e){
e.printStackTrace();
}finally{
try {
System.out.println("socket关闭");
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
以上并发的代码,可以保证一个请求过来,就有一个线程处理,在这我们为每一个任务分配一个线程,但是很明显这是有缺点的,尤其是需要创建大量的线程。
原因:线程生命周期开销是很大的,本省创建线程就需要时间,并且还要配合JVM和操作系统提供些辅助操作;另外资源消耗,活跃的线程会消耗系统资源,尤其是内存,如果可运行的线程多于可处理的数量,那么有些线程将会闲置,而这些大量的空余线程会占用很多内存。所以给线程管理带来了的挑战。
线程池能够简化线程的管理操作,并且实现了对生命周期的管理,统计信息的收集以及监视机制等。
Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行线程的任务相当于消费者。
构建一个基于Executor的web 服务器:
CodeExample:
public class TranstionalIOServer {
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(3);
ServerSocket server=new ServerSocket(10101);
System.out.println("服务器启动!");
while(true){
//获取一个套接字(阻塞)
final Socket socket = server.accept();
System.out.println("来个一个新客户端!");
newCachedThreadPool.execute(new Runnable() {
public void run() {
//业务处理
handler(socket);
}
});
}
}
public static void handler(Socket socket){
try {
byte[] bytes = new byte[1024];
InputStream inputStream = socket.getInputStream();
while(true){
//读取数据(阻塞)
int read = inputStream.read(bytes);
if(read != -1){
System.out.println(new String(bytes, 0, read));
}else{
break;
}
}
} catch(Exception e){
e.printStackTrace();
}finally{
try {
System.out.println("socket关闭");
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
通过将任务的提交与执行分别开来,很方便我们去修改执行策略。
在什么线程中执行任务?
任务按照什么顺序执行?FIFO,LIFO,优先级
在多少个任务能并发执行
在队列中有多少个任务在等待执行
如果系统由于过载需要拒绝一个任务,应该选择哪一个任务
在一个任务执行前后,应该进行哪些动作
就是一个能够管一组工作线程的资源池。它是和工作队列密切相关的,工作队列保存了所有等待执行的任务。工作者线程线程负责工作队列获取任务,然后执行任务。然后返回线程池,等待下一个任务。
最明显的好处就是:可以重用现有的线程,而不是每一次都创建一个新的线程,从而降低了资源(内存)消耗和提升了响应速度,也提升线程的可管理。
线程池的类型:
5.5.1newFixedThreadPool: 创建一个固定大小的线程池,每当提交一个任务时候,就创建一个。直到线程数量达到线程池设置的线程数量。此时,线程规模不再扩大。如果某一个线程由于发生未预期的异常,那么线程池会补充一个新的线程。
5.5.2newCachedThreadPool: 创建一个可缓存的线程池,如果线程池的规模超过了处理需求时,那么将回收空闲线程,而当需求增加时,可以添加新的线程,线程池的规模不存在什么限制
5.5.3newSingleThreadPool: 创建单个线程,如果这个线程结束,那么会创建另一个线程来代替。
5.5.4newScheduledThreadPool: 创建一个固定大小的线程池,并且以延时或者定时的方式来执行任务。
我们知道如何创建一个Execuotor,但是该如何关闭呢?
由于Executor以异步的方式来执行任务,因此在任何时刻,之前提交的任务的状态不是立即可见的,有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。关闭应用程序时候,可以以优雅的关闭,也可以粗暴的关闭。
为了解决Executor的声明周期问题,Executor扩展了ExecutorService接口,添加了一些生命周期管理方法:
初始创建的时候,处于运行状态
Shutdown:平缓的关闭,包括不再接受新的任务;等待已经提交的任务的执行完成,包括那些还未开始的任务
ShutdownNow:执行粗暴的关闭
awaitTermination: 等待ExecutorService达到终止状态,或者通过调用isTermination轮询ExecutorService是否终止。通常做法是调用
awaitTermination之后,立即调用shutdown.从而产生同步关闭ExecutorService的效果。
Timer类负责管理延迟任务以及周期任务。但是Timer存在一些缺点。
他只会创立一个线程,如果某个task执行任务时间过长,将会影响TimerTask定时的精确性。另外Timer如果抛出了一个异常,Timer线程并不捕获异常,Timer将终止定时线程。此种情况,Timer不会恢复线程的执行,而是错误的认为整个Timer都被取消了。新任务也执行不了。我们称之为线程泄露。
Women可以考虑使用SchedudledThreadPoolExecutor来替代它。
作为一个基本的任务表现形式,Runnable本身式由一定局限的,比如不能返回一个值或者抛出一个受检查的异常。但是实际上许多任务都存在延迟的计算,比如执行数据库查询,从网络上获取资源,或者计算一个复杂的功能。对于这些任务,Callable是一种更好的抽象,它能返回值并且可以抛出一个异常。
Future:表示一个任务的生命周期,并且来判断是否完成和去取消。以及获取任务的结果和取消任务。
向线程池提交一个任务后,线程池是如何处理这个任务呢?
首先,线程池主要是通过ThreadPoolExecutor来实现的,各种线程池策略都是基于ThreadPoolExecutor实现的。我们需要看一个线程池流程图:
第一步:ThreadPoolExecutor的execute执行一个任务,首先检查CorePool,如果CorePool内线程数 <corePoolSize则创建一个新的线程执行任务
第二步:如果CorePool内的线程数 >=corePoolSize,则查看Blocking
Queue是否已满,如果没有满,则将任务加入BlockingQueue。
第三步:如果BlockingQueue已经满了,则创建新的线程来处理任务
第四步:如果创建新线程使得当前运行的线程数超过maxinmumPoolSize,任务将被拒绝。并调用RejectedExecutionHadnler
.rejectedExecution方法。(也就是线程拒绝策略)
源码:
线程的几个状态:
runState是整个线程池的运行生命周期,有如下取值:
1. RUNNING:可以新加线程,同时可以处理queue中的线程。
2. SHUTDOWN:不增加新线程,但是处理queue中的线程。 3.STOP不增加新线程,同时不处理queue中的线程。
4.TIDYING 所有的线程都终止了(queue中),同时workerCount为0,那么此时进入TIDYING
5.terminated()方法结束,变为TERMINATED
具体解释一下上述参数:
corePoolSize 核心线程池大小
maximumPoolSize 线程池最大容量大小
keepAliveTime 线程池空闲时,线程存活的时间
TimeUnit 时间单位
ThreadFactory 线程工厂
BlockingQueue任务队列
RejectedExecutionHandler线程拒绝策略
如果外部代码在某个操作能在正常完成之前,将其置为完成状态,那么这个操作被称为可取消(Cancellable)
一般取消的原因有很多:
用户请求取消;有时间限制的操作;应用程序事件;错误;关闭等
Java中没有一种安全的抢占方式来停止线程,因此也没有安全的抢占方式来停止任务。只有一种协作的机制,使得请取消的任务和代码都遵循一种协商好的协议。
某种机制能设置已请求取消的标记,而任务定期查看该标志,如果设置了这个标志,那么任务提前结束。
public class PrimeGenerator implementsRunnable{
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
@Override
public void run() {
BigInteger p =BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel(){
cancelled =Boolean.TRUE;
}
public synchronized List<BigInteger> get(){
return new ArrayList<BigInteger>(primes);
}
public static void main(String[] args) {
PrimeGenerator generator = new PrimeGenerator();
new Thread(generator).start();
try {
Thread.sleep(5);
} catch(InterruptedException e) {
e.printStackTrace();
}finally {
generator.cancel();
}
List<BigInteger> ps =generator.get();
for(BigInteger i :ps){
System.out.println(i);
}
}
}
运行结果:
5毫秒之后,cancelled状态置为true,此时线程不在进入循环体。
如果线程池中,如果任务依赖于其他任务,那么可能产生死锁
任务阻塞时间过长,即使不出现死锁,线程响应也会变得糟糕。有一项技术可以缓解执行时间较长的任务造成的影响:限定任务等待资源的时间,而不要无限制的等待。平台类库大多数可阻塞方法中,都同时 定义了限时版本和不限时版本,比如Thread#join;
BlockingQueue#put,CountDownLatch#await,Selector#select等。如果等待超时,可以将任务设置为失败,然后终止任务或者将任务重新放回队列以便随后执行,这样无论任务最重结果成功与否,都能确保任务总能继续执行下去。
设置线程池应该避免过大和过小。过大导致内存升高,而且还可能耗尽资源;过小,导致许多空闲处理器无法工作,从而降低吞吐。
我们需要考虑有多少个CPU,多大内存,任务是计算密集型还是I/O密集型,他们是否需要像JDBC连接那样稀缺的资源。如果他们行为相差很大,应该考虑使用多个线程池。
线程数=CPU核数*期望CPU使用率*(1+等待时间/计算时间)
ThreadPoolExecutor为一些Executor提供了一些默认的实现,如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor等工厂方法返回的。
两个线程试图以不同的顺序来获取相同的锁,则有可能产生死锁。如果所有线程以相同顺序来获取锁,就不会出现锁顺序死锁问题。
Code Example:
这段程序怎么发生死锁呢?
所有的线程几乎都是按照参数顺序来获得锁,而这些参数取决于外部参数。加入A和B两个人:
A transferMoney(AAccount,BAccount,100);
B transferMoney(BAccount,AAccount,100);
如果执行顺序不当,A可能正在获得AAcount的锁并等待BAccount的锁,BAccount持有BAccount锁,正等待AAcount的锁
解决办法:
public class LeftRightDeadLock {
private static final Object tieLock =new Object();
public void transferMoney(Account from, Accountto,Double money){
class Helper {
public void transfer(Account from, Accountto,Double money){
from.debit(money);
to.credit(money);
}
}
int fromHash = System.identityHashCode(from);
int toHash = System.identityHashCode(to);
if (fromHash < toHash) {
synchronized (from) {
synchronized (to) {
new Helper().transfer(from, to, money);
}
}
} else if (fromHash > toHash) {
synchronized (to){
synchronized (from) {
new Helper().transfer(from, to, money);
}
}
} else {
synchronized (tieLock) {
synchronized (from) {
synchronized (to) {
new Helper().transfer(from, to, money);
}
}
}
}
}
class Account{
public void debit(Double money){
}
public void credit(Double money){
}
}
}
某些获取多个锁的操作并不像在LeftRightDeadLock那么明显,这两个锁并不一定必须在同一个方法中被获取。比如:
SetLocation,首先持有Taxi的锁,然后满足条件,调用Dispatcher#
notifyAvailable方法,持有Dispatcher锁;那么getImage()先持有
Dispatcher锁,然后调用Taxi.getPoint()时持有Taxi锁。这与LeftRightDeadLock情况相同,只是表现的不明显而已。
所以,在持有锁的方法调用外部方法,我们需要格外警惕。
怎么解决呢,尽量避免在同步方法中或同步块中调用外部其他类的同步方法。
显示使用Lock类定时tryLock来代替内置锁机制。使用内置锁,只要没有获取锁,就会永远等下去;而显示锁则可以指定一个超时时限,超时之后,tryLock会返回一失败信息
JVM可以通过ThreadDump来帮助识别deadlock的发生。
单线程程序即不存在线程调度,也不存在线程的同步开销,也就不需要使用锁来保证数据结构的一致性。但是对于读线程来说,并行带来的性能提升必须大于并发导致的开销。
如果可运行的线程数大于CPU的数量,那么操作系统最终会将某个正在运行线程调度出来,是其他线程能够使用CPU,这将导致一次上下文切换。这个过程将保存当前运行的线程的执行上下文,并将新调度进来的线程的执行上下文设置为当前上下文。
切换上下文需要一定开销,而在调度过程中,需要访问操作系统和JVM共享的数据结构。应用程序、操作系统和JVM都是用同一组相同CPU。在JVM和操作系统的代码中消耗越多CPU,应用程序可分配的CPU就越少。
线程由于等待某个发生竞争的锁而被阻塞时,JVM通常会将这个线程挂起,并允许它被交换出去。如果线程频繁的发生阻塞,与CPU密集型的程序就会发生越多的上下文切换,从而增加调度开销,降低吞吐量。
非竞争的同步可以完全在JVM处理,比如volatile。但是竞争的同步就需要操作系统的介入,从而增加开销。当在锁上发生竞争时,竞争失败的线程肯定会阻塞,JVM在实现阻塞行为时,可以采用自旋等待(循环不断去获取锁,直到成功)或者通过操作系统挂起被阻塞的线程。如果等待的时间较短,则适合自旋等待,如果时间较长,则适合挂起。如果需要挂起,这个过程包含两次额外的上下文切换。
对于由某个独占锁保护的资源进行访问时,我们采用串行方式,每一次只有一个线程访问,但是带来的开销也是大的。锁得请求频率*每次持有持有该锁的时间,如果二者乘积很小,那么大多数获取锁的操作都不会发生。
有三种方式可以减低锁的竞争程度:
l 减少锁得持有时间
l 减低锁得请求频率
l 使用带有协调机制的独占锁
将一些与所无关的代码移出同步代码块
降低了线程请求锁的频率。可以通过锁分解和锁分段实现。将采用多个互相独立的锁来保护独立的状态变量。
锁分解:Code Example
锁分段:比如ConcurrentHashMap。
Code Example
在写操作时必须独占锁。
Ø ReentrantLock可以构造公平锁,线程按照他们发出的请求顺序来获取锁;内置锁都是无序的
Ø ReentrantLock需要程序员手动释放锁;内置锁是基于代码块的,不需要程序员手动释放锁
Ø 内置锁中,死锁是一个严重问题,恢复办法就是重启程序;ReentrantLock提供了可定时与可轮询的方式,避免死锁发生。
如果没能获取锁,它会重新尝试获取,至少会将这个失败记录到日志。
Ø 如果出现死锁,内置锁可以借助ThreadDump分析死锁原因,但是ReentarntLock颈部会起到帮助作用
ReentrantReadWriteLock也可以构造一个公平锁,不过他不是基于线程请求的顺序,而是基于等待时间最长的线程,优先获得锁。而且如果当前读线程持有锁,而另一个线程成请求写入锁,那么其他读线程都不能获取锁,直到写线程释放锁,其他读线程才可以。如果非公平可不是这样的,线程获取的顺序是不确定的+
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。