当前位置:   article > 正文

java多线程略解及抢票程序_购票为什么要实现多线程模式

购票为什么要实现多线程模式

运用多线程只有一个目的,就是为了更好的利用cpu的资源。当然, 大部分多线程代码都可以用单线程实现,但是现实中多数的高并发场景(抢票、改名等)都需要通过多线程编程来进行模拟

线程和进程

  • 线程和进程都是一个时间段的描述,线程在进程下进行
  • 一个进程下可以有多个线程
  • 一个线程使用某些共享内存时,其他线程必须等它结束才能使用这一块内存
  • 不同进程间数据较难共享,统一进程下不同线程间数据共享较简单
  • (接上一条)同一进程下中的线程共享相同的地址空间,而不同的进程没有,因此线程可以读写同样的数据结构和变量,便于线程之间的通信,而不同进程间的通信很困难且消耗更多资源

线程池

尽管可以通过new Thread创建多个线程,但通过该方法创建线程时可能有以下问题

  • 每次创建和销毁线程时效率低
  • 创建的多个线程难以控制,可能存在相互竞争或者死锁
  • 手动创建线程难以实现规模化的操作

相比较之下,通过线程池创建多个线程更利于管理和操作,也能提高线程和系统资源的利用率

较为常用的有两大种方法,这两种方法都是通过创建 ExecutorService 接口类(实现了 ExecutorService 接口)的对象实现

创建 ThreadPoolExecutor 对象

通过 ThreadPoolExecutor 实现线程池最大的优势就是自定义线程池,ThreadPoolExecutor 继承于 AbstractExecutorService,其构造方法有四种重构

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,//前五项为基础参数
                              ThreadFactory threadFactory,//后两项通过排列组合构成三种重构
                              RejectedExecutionHandler handler) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

其中 corePoolSize 为线程池的基本大小,可以理解为*“可以同时进行的最大线程数”*,maximumPoolSize 线程池允许的最大线程数,keepAliveTime 线程最长存活时间,TimeUnit unit 设置存活时间单位

TimeUnit.DAYS     //天
TimeUnit.HOURS     //小时
TimeUnit.MINUTES    //分钟
TimeUnit.SECONDS    //秒
TimeUnit.MILLISECONDS //毫秒
  • 1
  • 2
  • 3
  • 4
  • 5

BlockingQueue 阻塞队列,常用的是 LinkedBlockingQueue<>(int len),一种基于链表实现的可选容量的阻塞队列,如果在初始化时没有指定容量,那么将会默认使用 int 的最大值作为容量

ThreadFactory 线程工厂,主要用于创建线程

RejectedExecutionHandler 当拒绝处理任务时的策略,有以下选项

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
  • 1
  • 2
  • 3
  • 4

创建 Executors 对象并使用其中的方法

  1. Executors.newFixThreadPool(int nThreads) :创建一个固定数量的线程池

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    线程池无界队列中的线程可以被重用(并在需要时使用提供的ThreadFactory创建新线程),如果所有线程都在活动中时提交了额外任务,那么任务将在队列中等待直到有可用线程。如果某一线程在执行任务过程中失败而被终止,那么在执行后续任务时将用新的线程代替,池中线程将一直存在直到其被显式指定为 shutdown

  2. Executors.newSingleThreadExecutors() :创建一个单线程的执行程序

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
        
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    该执行程序在无界队列上操作单一线程(并在需要时使用提供的ThreadFactory创建新线程),如果某一线程在执行任务过程中失败而被终止,那么在执行后续任务时将用新的线程代替。线程将被保证按顺序进行,而且在给定时间内的活动线程不会大于一个。该方法保证返回的执行程序不会被配置去使用其他的线程

  3. Executors.newCachedThreadPool() :创建一个可根据需要创建新线程的线程池

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
        
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    该线程池可以按需创建新的线程(并在需要时使用提供的ThreadFactory创建新线程),但如果之前创建的线程可用,将优先使用之前已创建的线程,如果没有可用的现有线程,则创建一个新线程并将其加入至池中。60 s 内未被使用的线程将被终结并从缓存中删除

  4. Executors.newSingleThreadScheduledExecutor() :创建一个可以延时的单线程执行程序

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }
        
        public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1, threadFactory));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    该执行程序可以延迟至给定时间后或定期执行(并在需要时使用提供的ThreadFactory创建新线程),如果处于任务中的单线程由于任务失败而终止,那么在执行后续任务时将用新的线程代替。线程将被保证按顺序进行,而且在给定时间内的活动线程不会大于一个。该方法保证返回的执行程序不会被配置去使用其他的线程

为什么不推荐使用 Executors 创建线程池

在《阿里巴巴java开发手册》中明确禁止不允许使用 Executors 创建线程池

  1. 【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

    说明:Executors 返回的线程池对象的弊端如下:

    1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

    2)CachedThreadPool和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

多线程抢票系统的实现

为加深对多线程的理解,尝试写了一个多线程抢票系统,该系统需满足以下要求:

  1. 多线程模拟客户进行的买票
  2. 每个顾客会买 [1, 10] 张票
  3. 限定票数,顾客购买数不能多于票数
  4. 当一个交易进行时其他交易等待
package tickets;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

class Client extends Thread{
    private int num;//剩余票数
    private int sold_num = 0;//已卖出票数
    private CountDownLatch latch;//---------1

    public Client(int tickets_left, CountDownLatch latch) {
        this.latch = latch;
        this.num = tickets_left;
    }

    @Override
    public synchronized void run() {//---------2
        Random rand = new Random();
        int buy_in = rand.nextInt(10) + 1;
        if(num >= buy_in) {
            num -= buy_in;
            sold_num += buy_in;
            System.out.println(Thread.currentThread().getName() + " thread buys in " + buy_in + 
                      " ticket(s)");
            System.out.println(num + " tickets left");
        }
        else {
            System.out.println("Tickets sold out");
        }
        latch.countDown();
    }

    public int sold() {
        return sold_num;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
package tickets;

import java.util.concurrent.*;

public class server {
    private static ExecutorService pool;
    private static int TICKETS = 5000;
    private static CountDownLatch latch;

    public static void main(String[] args){

        pool = new ThreadPoolExecutor(50, 1000, 300, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000));//---------3

        latch = new CountDownLatch(1000);

        Client task = new Client(TICKETS, latch);

        for(int i = 0; i < 1000; i++){
            pool.execute(task);
        }

        try {
            latch.await();
        }catch (InterruptedException e){
            e.printStackTrace();
        }

        System.out.println(task.sold() + " tickets sold");
        pool.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
pool-1-thread-1 thread buys in 8 ticket(s)
4992 tickets left
pool-1-thread-50 thread buys in 3 ticket(s)
4989 tickets left
pool-1-thread-49 thread buys in 7 ticket(s)
4982 tickets left
......
pool-1-thread-36 thread buys in 7 ticket(s)
4 tickets left
pool-1-thread-36 thread buys in 4 ticket(s)
0 tickets left
Tickets sold out
......
5000 tickets sold
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  1. CountDownLatch

    可以理解为线程间的计数器,将 CountDownLatch 的计数器初始化为 new CountDownLatch(n),每当一个线程结束自己的任务计数器就会减一,当计数器的值为0时,在 CountDownLatch 上等待的线程便可以继续执行

    其缺点是,CountDownLatch 在构造方法中只能初始化一次,之后没有任何方法能设置它的值,在计数器归零后不能被再次使用

    在本例中,Client 类初始化了一个 CountDownLatch 计数器,每一个线程执行完任务后,latch 调用 latch.countDown() 方法使计数器减一,同时在 server 类中也构造了一个计数器,并将其初始化为1000(即假设的顾客人数),在后面调用 latch.await() 方法使其等待直到计数器减为0,即所有顾客完成购票后,再打印总的购票信息,完成抢票

  2. synchronized

    Java中的一种同步锁。Java中每一个对象都可以作为锁,这是 synchronized 实现同步的基础。由于在并发编程中存在线程安全问题(有共享数据,多线程同时操作共享数据等),同步锁的存在可以防止多个线程同时对某个共享数据进行读写,确保当一个线程调用该上锁方法时,其他线程必须处于阻塞中

    synchronized 的修饰对象有以下几种:

    1. 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{}括起来的代码,作用的对象是调用这个代码块的对象
    2. 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象
    3. 修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有对象
    4. 修改一个类,其作用的范围是 synchronized 后面括号括起来的部分,作用主的对象是这个类的所有对象。

    在本例中,因为在创建线程池时设置了 corePoolSize 为50,意味着同步进行的线程最多可以有50个,所以需要将重载的 run 方法加上 synchronized 修饰,限制了同一时间段只有一个线程能执行 run 方法,即实现排队售票。

    当然,最简单的解决方法就是我们可以将线程池中的 corePoolSize 设置为1,即直接在初始化线程池时设置最大并发数量为1,就可以不用给 run 加锁

  3. 参数详见创建线程池的第一大方法

更多文章详见个人网站ouz2hou.top,最新文章将第一时间在个人网站发表

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/632209
推荐阅读
相关标签
  

闽ICP备14008679号