赞
踩
状态 | 说明 |
---|---|
新建(NEW) | 初始状态,线程被创建但还没有调用start方法 |
运行(RUNNABLE) | 运行状态,就绪和运行统称为运行中。 |
阻塞(BLOCKED) | 阻塞状态,线程被锁 |
等待(WAITING) | 线程进入等待状态,需要其他线程通知或中断才能退出该状态。 |
超时等待(TIME_WAITING) | 指定时间后退出该状态 |
终止(TERMINATED) | 当前线程执行完毕。 |
1、继成Thread
public classs MyThread extends Thread {
@Override
public void run() {
System.out.println("this is the MyThread!");
}
}
2、实现Runnable 接口
class RunnableThread implements Runnable{
@Override
public void run() {
System.out.println("this is runnable thread");
}
}
3、实现Callable接口,相较于上面两种方式,这个有返回值
class CallableThread implements Callable<String> { @Override public String call() throws Exception { // do some job TimeUnit.SECONDS.sleep(10); return "complete the job"; } } //使用例子 public void test() throws Exception { FutureTask<String> futureTask = new FutureTask<>(new CallableThread()); Thread callable = new Thread(futureTask); callable.start(); boolean done = futureTask.isDone(); boolean cancelled = futureTask.isCancelled(); // while (!Thread.interrupted()),那么本次任务会一直执行,只有mayInterruptIfRunning=true futureTask.cancel(true); // 设置获取结果的等待时间,超时抛出timeOutException // String s = futureTask.get(1, TimeUnit.SECONDS); // 阻塞等待 String result = futureTask.get(); System.out.println(result); }
4、线程池创建
参数说明
public ThreadPoolExecutor(int corePoolSize, //线程池的核心线程数量
int maximumPoolSize, //线程池的最大线程数
long keepAliveTime, //当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit, //时间单位
BlockingQueue<Runnable> workQueue, //任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory, //线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler //拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
){
}
原理:
1、任务进来后,当运行线程少于核心线程则创建新的线程来处理任务,即使线程池的其他线程是空闲的;
2、运行线程大于核心线程小于最大线程,则进入工作队列。
3、当队列中满的时候并且线程小于最大线程,则创建线程处理任务。
4、当运行线程大于最大线程,则执行拒绝策略。
思考: coreSize = 0 会怎么处理?
答案
线程池拒绝策略
拒绝策略 | 说明 |
---|---|
AbortPolicy | 抛异常,拒绝新的任务 |
CallerRunsPolicy | 使用调用线程来执行任务 |
DiscardPolicy | 直接丢弃任务 |
DiscardOldestPolicy | 丢弃最早未处理的任务 |
jdk提供的几种线程池
线程池 | 说明 | 是否建议使用 |
---|---|---|
newFixedThreadPool | 创建固定数量线程的线程池,控制最大并发数 | 不建议,工作队列是new LinkedBlockingQueue(),是无界队列,高兴发下可能会导致OOM。(最大线程数参数无法生效) |
newSingleThreadExecutor | 单线程线程池 | 不建议,工作队列是new LinkedBlockingQueue(),是无界队列,高兴发下可能会导致OOM。(最大线程数参数无法生效) |
newCachedThreadPool | 可缓存线程池,工作队列是new SynchronousQueue() | 不建议,最大线程数设置为Integer.MAX_VALUE。高并发下可能会导致OOM |
newScheduledThreadPool | 支持定时和周期性执行的线程池,工作队列是new DelayedWorkQueue() ,其封装了一个 PriorityQueue | 可以使用 |
平常工作中建议 自定义线程池
new ThreadPoolExecutor(...)
线程池优点
因为线程的创建和销毁是很消耗资源的,利用线程池能够复用线程,减少线程创建销毁动作,来提高资源利用率和效率。
线程池的一些方法
execute() 和 submit()
execute(): 提交任务,无返回值。
submit(): 提交任务,返回Future对象,通过future可以判断任务执行情况。
shutdown() 和shutdownNow()
shutdown 关闭线程池,状态变为SHUTDOWN, 但是队列里面的任务得执行完毕。
shutdownNow 关闭线程池,状态变为STOP。线程会终止当前执行的任务,并停止处理队列中的任务。不建议使用。
isTerminated() 和 isShutdown()
isShutdown: 当调用shutdown() 返回true
isTerminated: 当调用shutdown(),等待所有任务完成后返回true
AQS是用来构建锁和同步器的框架,使用AQS可以简单高效的构造出应用广泛的大量同步器。
1、使用volatile 修饰state (private volatile int state;)变量标识共享资源的状态。volatile可以保证线程的可见性和有序性,这里主要利用可见性。
2、请求资源被占用,AQS使用CLH队列实现线程的阻塞和线程唤醒后锁分配机制。
3、CLH是一个虚拟双向队列,不存在队列实例,仅存在节点与节点之间关联关系。
4、通过CAS获取共享资源,如果获取失败调用native方法进入park状态。
子类需要实现的方法
boolean isHeldExclusively(): 判断该线程是否正在独占资源,需要condition才需要实现它。
boolean tryAcquire(int arg): 独占锁,尝试获取锁。
boolean tryRelease(int arg): 独占锁,释放锁。
int tryAcquireShared(int arg): 共享锁,尝试获取锁,负数表示失败,0标识成功,但没有可用资源,正数表示成功并且有剩余资源。
boolean tryReleaseShared(int arg): 共享锁,释放锁。
基于AQS实现独占锁和condition。可重入,可设置公平或者非公平锁,默认非公平锁。
基于AQS实现的共享锁,初始化的时候设置了AQS的state的数量。主要方法有 await和countdown。
await:
如果state为0,表示全部countdown了,不阻塞方法。
如果state不为0,新的任务添加到CLHz中。
countdown:
CAS+自旋扣减statue状态。当状态为0时,唤醒await等待的线程。
事例:
CountDownLatch countDownLatch = new CountDownLatch(2); for (int i = 0; i < 2; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "线程运行"); countDownLatch.countDown(); }, "线程" + i).start(); } System.out.println("等待上面线程结束"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("主线程运行"); 运行结果: 等待上面线程结束 线程0线程运行 线程1线程运行 主线程运行
内部使用ReentrantLock 非公平锁。当所有线程达到一定状态时,一起执行。
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> { System.out.println("最后执行"); }); for(int i = 0; i < 3; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + ":等待"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ":运行"); }, "线程" + i).start(); }
信号量,基于AQS,区分公平和非公平,默认非公平。
acquire() 方法:CAS 获取锁,如果资源为0,进入等待队列。
release() 方法:CAS释放锁,释放成功唤醒队列节点。
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "正在工作");
Thread.sleep(3000);
semaphore.release();
System.out.println(Thread.currentThread().getName() + "工作结束,离开");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程" + i).start();
}
ThreadLocal能为每个线程存储专属值。ThreadLocal类存储了ThreadLocal.ThreadLocalMap对象, map的key是ThreadLocal的弱引用,value是要放入的值。
public class ThreadLocal<T> { /** * ThreadLocal能为每个 Thread线程 绑定一个专属值的奥秘就是: * 每个Thread对象都持有一个 ThreadLocalMap类型的成员变量,其key为ThreadLocal对象, * value为绑定的值,所以每个线程调用 ThreadLocal对象 的set(T value)方法时,都会将 * 该ThreadLocal对象和绑定的值 以键值对的形式存入当前线程,这样,同一个ThreadLocal对象 * 就可以为每个线程绑定一个专属值咯。 * 每个线程调用 ThreadLocal对象的get()方法时,就可以根据 当前ThreadLocal对象 get到 绑定的值。 */ public void set(T value) { // 获取当前线程 Thread t = Thread.currentThread(); // 获取当前线程对象中持有的 ThreadLocalMap类型的成员变量 // ThreadLocalMap是一个Map类型的类 ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } ThreadLocalMap getMap(Thread t) { // Thread类中有一个 ThreadLocalMap 类型的threadLocals变量 return t.threadLocals; } void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { // 通过当前 ThreadLocal对象,获取绑定的值 ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } public void remove() { // 获取当前线程的ThreadLocalMap成员变量,不为空就将当前 ThreadLocal对象 // 对应的 键值对 remove掉 ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); } /** * 与大部分 Map 的实现相同,底层也是使用 动态数组来保存 键值对Entry,也有rehash、resize等 * 操作 */ static class ThreadLocalMap { /** * 存储键值对,key 为 ThreadLocal对象,value 为 与该ThreadLocal对象绑定的值 * Entry的key是对ThreadLocal的弱引用,当抛弃掉ThreadLocal对象时,垃圾收集器会 * 忽略这个key的引用而清理掉ThreadLocal对象,防止了内存泄漏 */ static class Entry extends WeakReference<ThreadLocal<?>> { Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } // 看过 HashMap 或 ConcurrentHashMap 源码的同学 一定下面对这些代码很眼熟 /** * 数组初始容量 */ private static final int INITIAL_CAPACITY = 16; /** * Entry数组,用于存储 <ThreadLocal<?> k, Object v>键值对 */ private Entry[] table; /** * Entry元素数量 */ private int size = 0; /** * 类似于 HashMap 扩容因子机制 */ private int threshold; // Default to 0 private void setThreshold(int len) { threshold = len * 2 / 3; } private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); } /** * 系列构造方法 */ ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); } private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } } /** * 根据 ThreadLocal对象 获取其对应的 Entry实例 */ private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); } /** * 常规Map实现类 的set()方法,只不过这里的 key被规定为 ThreadLocal类型 */ private void set(ThreadLocal<?> key, Object value) { Entry[] tab = table; int len = tab.length; // 根据哈希码和数组长度求元素放置的位置,如果该位置有其它元素,就依次尝试往后放 int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); // 如果key相等,覆盖value if (k == key) { e.value = value; return; } // 如果key为null,用新key、value覆盖,同时清理历史key=null的陈旧数据 if (k == null) { replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); int sz = ++size; // 若超过阀值,则rehash if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } /** * Remove the entry for key. */ private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } } } /** * 调整当前table的容量。首先扫描整个容器,以删除过时的条目,如果这不能充分缩小表的大小, * 将进行扩容操作 */ private void rehash() { // 扫描整个容器,删除过时的条目 expungeStaleEntries(); // 若未能充分缩小表的大小,则进行扩容操作 if (size >= threshold - threshold / 4) resize(); } /** * 扩容为原容量的两倍 */ private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; // 遍历Entry[]数组 for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); // 如果key=null,把value也置null,有助于GC回收对象 if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } // 设置新的阈值 setThreshold(newLen); size = count; table = newTab; } } }
测试:
public class Test { ThreadLocal threadLocal = new ThreadLocal(); int a; public void set(int a) { this.a = a; } public int get() { return a; } public void setLocal(int a) { threadLocal.set(a); } public int getLocal() { return (Integer) threadLocal.get(); } public static void main(String[] args) { Test test = new Test(); new Thread(() -> { test.set(1); test.setLocal(1); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Thread thread = Thread.currentThread(); System.out.println(thread.getName() + "线程, a:" + test.get()); System.out.println(thread.getName() + "线程, loacl a:" + test.getLocal()); }, "A").start(); new Thread(() -> { test.set(2); test.setLocal(2); Thread thread = Thread.currentThread(); System.out.println(thread.getName() + "线程, a:" + test.get()); System.out.println(thread.getName() + "线程, loacl a:" + test.getLocal()); }, "B").start(); } } 打印结果: B线程, a:2 B线程, loacl a:2 A线程, a:2 A线程, loacl a:1
ThreadLocal实际应用
1、Spring事务: 用ThreadLocal保证同一个connection.
2、读写分离:使用theadLocal获取当前需要执行的数据源,结合AbstractDataSourceRouter执行需要执行的数据库。
3、SimpleDateFormat结合应用:
使用SimpleDateFormat的parse的方法会先调用Calender.clear()方法,然后调用Calender.add()。如果有一个线程A调用add,另外一个线程B调用clear,这时候A的parse()方法解析的时间就不对了。
可以用以下方式
private static ThreadLocal<SimpleDateFormat> simpleDateFormat = ThreadLocal.withInitial(() ->
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
);
或者直接用DateTimeFormatter (jdk8)
死锁条件:互斥、不可剥夺、循环等待、请求与保持。
避免死锁:1、加上锁持有过期时间
2、银行家算法:首先需要定义状态和安全状态的概念。系统的状态是当前给进程分配的资源情况。因此,状态包含两个向量Resource(系统中每种资源的总量)和Available(未分配给进程的每种资源的总量)及两个矩阵Claim(表示进程对资源的需求)和Allocation(表示当前分配给进程的资源)。安全状态是指至少有一个资源分配序列不会导致死锁。当进程请求一组资源时,假设同意该请求,从而改变了系统的状态,然后确定其结果是否还处于安全状态。如果是,同意这个请求;如果不是,阻塞该进程知道同意该请求后系统状态仍然是安全的。
3、多个资源保持一致的加锁顺序。
减少持有时间: 只在有线程安全的地方加锁。
减少锁的粒度: 分段锁
锁分离:读写锁
锁粗化:同步快过于细化,导致多次获取锁,可以扩大锁的范围。
锁消除:在即时编译时,如果发现不可以被共享的对象,则可以消除这些对象的锁操作
自旋锁:对于锁状态很短的线程,挂起和恢复很消耗性能,可以使用自旋锁技术。
synchronized关键字解决的是多线程之间访问资源的同步性,synchronized可以保证他修饰的方法或者代码块在任意时刻只会有一个线程执行。
作用于代码块原理:使用monitorenter 表示开始位置和 monitorexit 表示结束位置。
作用方法上:使用ACC_SYNCHRONIZED标识,该标识指明了该方法是同一个方法,JVM通过ACC_SYNCHRONIZED访问标志来辨别一个方法是否声明为同步方法,从而相应的同步调用。
锁的是谁?
作用于普通实例方法上,锁的是当前实例对象。
作用于静态方法上,锁的是class
作用于代码块,锁的是代码块内的对象
synchronized锁升级
无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁 锁只可以升级,不可降级。 重量级锁释放后变为无锁状态
偏向锁:
在大多数情况下,锁不存在竞争,总是由同一个线程获取,引入偏向锁减少锁获取的代价。
偏向锁原理以及升级过程:
1、当线程A获取锁对象时候,会在JAVA对象头和栈针中记录偏向锁的线程id。
2、偏向锁不会主动释放锁,当线程A再次获取锁对象的时候,会比较当前偏向锁id和线程id是否相等。
* 相等则直接执行,无需CAS操作。
* 如果不相等(如B线程来获取锁对象),查看线程A是否存活。
* 不存活则锁对象被置为无锁状态,线程B可以竞争设置偏向锁。
* 如果线程A还存活,查看线程A的栈帧信息。
* 如果线程A要继续持有该对象锁,则升级为轻量级锁
* 如果线程A不需要在持有,则把锁对象置为无锁状态,其他线程继续偏向锁竞争。
轻量级锁:
当锁的竞争不激烈,并且持有的时间不长的时候,此时引入轻量级锁。
轻量级锁原理以及升级过程:
CAS自旋。
长时间自旋,达到一定次数后升级为重量级锁。
线程A在执行,线程b自旋来获取锁,然后又进来线程c来争夺锁对象。
重量级锁:
多个线程同时在竞争锁对象的时候,引入重量级锁。
阻塞所有等得竞争的线程,
1、synchronized是jvm层面的,lock是jdk层面的
2、synchronized会自动释放锁,lock需要手动释放锁
3、synchronized是非公平锁,lock可以是公平锁也可以是非公平锁。
4、synchronized不可中断,lock可以实现中断。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。