赞
踩
《Java Concurrency In Practice》的作者 Brian Goetz 对线程安全是这样理解的,当多个线程访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行问题,也不需要进行额外的同步,而调用这个对象的行为都可以获得正确的结果,那这个对象便是线程安全的。
按照线程操作共享数据的"安全程度"可以分为:不可变,绝对安全,相对安全,线程兼容和线程对立。
对于共享数据是基本类型,通过定义该基本变量为final就可以保证它不可变,这种不可变性是在编译时确定的。在编译时,对于final变量,是必须初始化值的,不然编译器会报错,并且Java编译器将会将使用final变量的地方进行值替换。这样能提高运行时效率。
如果共享数据是一个对象,需要保证这个对象的方法不对其变量进行重新赋值引用。java.lang.String就是一个典型的不可变对象,它的substring()、replace()和concat()这些方法都不会影响它原来的值,而是返回一个new String()的新对象。还有如枚举类、Number的部分子类(如BigDecimal)。
如Java中的Vector类,尽管它的get()、remove()、size()方法都采用了synchronized,但是在多线程环境下,还是需要调用方法做额外的同步措施才能保证绝对安全。
相对的线程安全就是我们通常意义上讲的线程安全。如Vector、HashTable等类。
线程兼容是指对象本身不是线程安全的。这种情况也就是我们平时说的线程不安全。可以通过调用端正确地使用同步手段实现对象在并发环境下正确访问。
这种情况应该尽量避免。如废弃的Thread类的suspend()和resume()方法。
这里总结了4种场景是需要注意线程安全的。
第一种场景是访问共享变量或共享资源的时候,典型的场景有访问共享对象的属性,访问 static 静态变量,访问共享的缓存,等等。最常见的情况是多个线程同时对未做同步处理的i++操作。将导致实际的值少于我们预想的值。
如下代码,如果多个线程访问,A线程执行完第一行,由于总线中断,A线程退出执行,交由B线程执行,B线程将map中的key给删除,然后总线又中断,交由A线程来执行第二行代码,就会导致对象不存在,移除报错。
if (map.containsKey(key)) {
map.remove(obj)
}
第三种需要我们注意的线程安全场景是不同数据之间存在相互绑定关系的情况。有时候,我们的不同数据之间是成组出现的,存在着相互对应或绑定的关系,最典型的就是 IP 和端口号。
举个例子,比如说我们定义了 ArrayList,它本身并不是线程安全的,如果此时多个线程同时对 ArrayList 进行并发读/写,那么就有可能会产生线程安全问题,造成数据出错。正于源码中如下所述。需要在外部手动用 synchronized 等方式保证并发安全。
Note that this implementation is not synchronized. If multiple threads
access an ArrayList instance concurrently, and at least one of the threads
modifies the list structurally, it must be synchronized externally.
这是因为单线程程序是独立工作的,不需要与其他线程进行交互,但多线程之间则需要调度以及合作,调度与合作就会带来性能开销从而产生性能问题。有研究表明,页面每多响应 1 秒,就会流失至少 7% 的用户,而超过 8 秒无法返回结果的话,几乎所有用户都不会选择继续等待。
主要有两个方面,一方面是线程调度,另一个方面是线程协作。
在实际开发中,线程数往往是大于 CPU 核心数的,比如 CPU 核心数可能是 8 核、16 核,等等,但线程数可能达到成百上千个。这种情况下,操作系统就会按照一定的调度算法,给每个线程分配时间片,让每个线程都有机会得到运行。所以就出现了总线中断器需要保存上一个线程的执行点,并唤醒下一个要执行的线程。如果在一个时间片周期内,我们不要再去做资源的同步处理,从CPU切出当前执行的线程,如果执行很简单,我们通常是做无锁CAS等待的方式处理。
由于程序有很大概率会再次访问刚才访问过的数据,所以为了加速整个程序的运行,会使用缓存,并且缓存块存储的数据是从磁盘的某个块进行一次性读取的(比如一次读取64KB)。这样我们在使用相同数据时就可以很快地获取数据。可一旦进行了线程调度,切换到其他线程,CPU就会去执行不同的代码,原有的缓存就很可能失效了,需要重新缓存新的数据。所以就需要减少线程切换。什么情况会导致密集的上下文切换呢?如果程序频繁地竞争锁,或者由于IO读写等原因导致频繁阻塞,那么这个程序就可能需要更多的上下文切换。
因为线程之间如果有共享数据,为了避免数据错乱,为了保证线程安全,就有可能禁止编译器和 CPU 对其进行重排序等优化,也可能出于同步的目的,反复把线程工作内存的数据 flush 到主存中,然后再从主内存 refresh 到其他线程的工作内存中。这些都是需要占用CPU资源的。
多线程对共享资源安全访问主要有3种实现方式。分别是互斥访问、非同步阻塞和无同步方案。
互斥同步是指多个线程并发访问共享数据时,保证共享数据在同一个时刻只被一个线程使用,即串行访问共享资源,是一种悲观的并发策略,无论共享数据是否会真的出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁,如通过偏向锁、自旋等方式)、用户态和核心态转换,维护锁计数器和检查是否有被阻塞的线程需要唤醒。它最主要的问题就是进行线程阻塞和唤醒所带来的性能问题。临界区(Critical Section)、互斥量(Mutex)和信号量(Semaphore)都是主要的互斥实现方式。
在Java中最基本的互斥同步是synchronized关键字。反编译后会在同步块前后生成monitorenter和monitorexit这两个字节码指令。这两个字节码都需要一个reference类型的参数来指明要锁定和解锁的对象。如果指明了对象参数,就用这个对象的reference;如没有明确指定,那就根据synchronized修饰的是实例方法还是类方法,去取对应的对象实例或Class对象来作为锁对象。更加详细讲解在第24节。
synchronized同步块对同一个线程来说是可重入的,不会出现自己把自己锁死的问题。同步块在已进入的线程执行完之前,会阻塞后面其他线程的进入。Java中采用的是轻量级进程支持多线程,所以要阻塞或者唤醒一个线程都需要涉及到用户态和核心态的切换。所以,这是个重量级的锁。虚拟机本身为了避免频繁地切入到核心态,在通知操作系统阻塞线程之前加入一段自旋等待锁释放的过程,如果锁及时释放,则抢占锁。
除了synchronized之外,我们还可以使用java.util.concurrent中的重入锁ReentrantLock来实现同步。ReentrantLock表现在API层的互斥锁(lock和unlock方法配合try/finally语句块来完成)。synchronized表现为原生语法字节码层的实现。之前听某些讲师认为ReentrantLock因为使用了自旋锁,全是Java级别调用,是用户态的操作。而synchronized关键字调用了c语言代码,涉及到了内核态和用户态的切换,所以,synchronized关键字是重量级锁。这种说法是错误的。是否涉及到内核态和用户态的操作是由是否调用操作系统申请和释放线程资源。如ReentrantLock需要挂起当前线程,调用native park方法,就是涉及到操作系统的资源管理,需要调用操作系统的os::Linux::safe_cond_timedwait方法就涉及到了内核调用。ReentrantLock更加高效的真正原因是通过自旋锁化解了很多不必要线程调度。而优化后的synchronized在虚拟机层面也是通过自旋的方式,性能与ReentrantLock不再相差很远。以下是park方法,其作用是挂起当前线程。其中_counter可理解为AQS中的state,大于0即该线程拥有许可。
void Parker::park(bool isAbsolute, jlong time) { //原子交换,如果_counter > 0,则将_counter置为0,直接返回,否则_counter为0 if (Atomic::xchg(0, &_counter) > 0) return; //获取当前线程 Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); //下转型为java线程,可理解为C语言线程是对java线程的封装,这里是获取java线程引用 JavaThread *jt = (JavaThread *)thread; //如果当前线程设置了中断标志,调用park则直接返回,所以如果在park之前调用了 //interrupt就会直接返回 if (Thread::is_interrupted(thread, false)) { return; } // 高精度绝对时间变量 timespec absTime; //如果time小于0,或者isAbsolute是true并且time等于0则直接返回 if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } //如果time大于0,则根据是否是高精度定时计算到需要挂起的时间 if (time > 0) { unpackTime(&absTime, isAbsolute, time); } //进入安全点避免死锁 ThreadBlockInVM tbivm(jt); //如果当前线程设置了中断标志,或者获取mutex互斥锁失败则直接返回 //由于Parker是每个线程都有的,所以_counter cond mutex都是每个线程都有的, //不是所有线程共享的所以加锁失败只有两种情况,第一unpark已经加锁这时只需要返回即可, //第二调用调用pthread_mutex_trylock出错。对于第一种情况就类似是unpark先调用的情况,所以 //直接返回。 if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } int status ; //staus用于判断是否获取锁 //如果_counter大于0,说明unpark已经调用完成了将_counter置为了1, //现在只需将_counter置0,解锁,返回 if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); OrderAccess::fence(); return; } OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() assert(_cur_index == -1, "invariant"); //如果time等于0,说明是相对时间也就是isAbsolute是fasle(否则前面就直接返回了),则直接挂起 if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;//挂起当前线程 } else { //如果time非0 //判断isAbsolute是false还是true,false的话使用_cond[0],否则用_cond[1] _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; //使用条件变量使得当前线程挂起。 status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; //如果挂起失败则销毁当前的条件变量重新初始化。 if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } //如果pthread_cond_wait成功则以下代码都是线程被唤醒后执行的。 _cur_index = -1; assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); #endif //将_counter变量重新置为1 _counter = 0 ; //解锁 status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // 使用内存屏障使_counter对其它线程可见 OrderAccess::fence(); // 如果在park线程挂起的时候调用了stop或者suspend则还需要将线程挂起不能返回 if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } }
所以park和unpark和核心就是counter、cur_index、mutex、cond,通过使用条件变量对counter进行操作,在调用park时,如果counter是0则会去执行挂起的流程,否则返回,在挂起恢复后再将counter置为0。在unpark的时候如果counter是0则会执行唤醒的流程,否则不执行唤醒流程,并且不管什么情况始终将counter置为1。
而ReentrantLock增加了一些高级特性。如等待可中断、可实现公平锁,以及锁可以绑定多个条件。这些内容将在第30节进行讲解。
随着硬件指令集的发展,我们可以使用基于冲突检测的乐观并发策略,通俗的讲,就是先执行操作,如果没有其他线程争用共享数据,那操作就成功了;如果共享数据有争用,产生了冲突,就采取其他补偿措施(最常见的补偿措施为不断重试,直到成功为止)。这种方式不需要把线程挂起。称为非阻塞同步。
在IA64、x86指令集中有cmpxchg指令完成CAS功能。虚拟机只允许Bootstrap ClassLoader对Unsafe类进行加载。且对调用Unsafe类的compareAndSwapInt()几个CAS方法进行了编译处理,直接编译与平台相关的CAS指令。但CAS存在“ABA”问题,JUC中引入“AtomicStampedReference”类,通过控制变量值的版本来保证CAS的正确性。这种方式也用在数据库设计中,用于对业务进行防重。比如,更新时带版本version字段更新,防止商品超卖等现象。
要保证线程安全,并不是一定要进行同步。如果一个资源本来就不涉及多个线程共享,那它就不需要任何同步措施,所以ThreadLocal并不是解决资源共享问题的。
可重入代码。可重入代码有一些共同的特征。例如不依赖存储在堆上的数据和公用的系统资源、用到的状态量都由参数传入、不调用非可重入方法。
线程本地存储(Thread Local Storage)。如果一段代码中所需要的数据能保证在同一个线程中执行,我们就可以限制该数据的可见范围在该线程中即可。这样,无须同步也能保证线程之间不出现数据争用的问题。这种情况一个经典的应用实例是Web交互模型中“一个请求对应一个服务器线程”的处理方式。存放在ThreadLocal中的变量都是当前线程本身私有变量。其他线程本身就不能访问,存到ThreadLocal中只是为了方便在程序中同一个线程之间传递这个变量。多个线程在同一时刻访问或修改的并不是同一个对象,从而隔离了多个线程对共享数据的访问,如果是修改,则不会影响其他线程副本。如果一个变量要被多个线程访问,并且修改后要影响其他线程的行为,可以使用volatile关键字。
线程安全问题产生有两个前提,(1)存在数据共享,即多个线程访问同样的数据。(2)共享数据是可变的。多个线程对访问的共享数据做出来修改。显然ThreadLocal没有这样的前提条件,它也不是用来解决并发访问共享数据的。
当使用线程池的时候,由于ThreadLocal的设计原理是将一个ThreadLocalMap的引用作为Thread的一个属性,利用当前ThreadLocal作为key,保存的变量值作为value保存在当前线程的ThreadLocalMap中的。所以ThreadLocalMap是伴随的Thread本身的存在而存在的,只要Thread不被回收,ThreadLocalMap就存在。因此,对于线程池来讲,重复利用一个Thread就等于在重复利用Thread的ThreadLocalMap,所以ThreadLocalMap里面保存的数据可能会被多次使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。