当前位置:   article > 正文

多线程执行_多线程执行程序

多线程执行程序

一、交互执行abc

1、队列+park+volatile

  1. private static LinkedList<Thread> list = new LinkedList<>();
  2. private static volatile Boolean state = new Boolean(false);
  3. public static void main(String[] args) {
  4. Thread thread1 = new Thread(() -> {
  5. while (true) {
  6. LockSupport.park();
  7. System.out.println("a");
  8. state = new Boolean(true);
  9. }
  10. });
  11. thread1.start();
  12. list.add(thread1);
  13. Thread thread2 = new Thread(() -> {
  14. while (true) {
  15. LockSupport.park();
  16. System.out.println("b");
  17. state = new Boolean(true);
  18. }
  19. });
  20. thread2.start();
  21. list.add(thread2);
  22. Thread thread3 = new Thread(() -> {
  23. while (true) {
  24. LockSupport.park();
  25. System.out.println("c");
  26. state = new Boolean(true);
  27. }
  28. });
  29. thread3.start();
  30. list.add(thread3);
  31. while (true) {
  32. Thread take = list.poll();
  33. LockSupport.unpark(take);
  34. while (true) {
  35. if (state.booleanValue() == true) {
  36. state = new Boolean(false);
  37. list.add(take);
  38. break;
  39. }
  40. }
  41. }
  42. }

二、一个打印偶数一个打印奇数

1、synchronized

  1. private static final Object lock = new Object();
  2. private static int num = 0;
  3. public static void main(String[] args) throws InterruptedException {
  4. Thread thread1 = new Thread(() -> {
  5. while (true) {
  6. synchronized (lock) {
  7. try {
  8. System.out.println(Thread.currentThread().getName() + "=" + num++);
  9. lock.notifyAll();
  10. lock.wait();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }
  16. });
  17. Thread thread2 = new Thread(() -> {
  18. while (true) {
  19. synchronized (lock) {
  20. try {
  21. System.out.println(Thread.currentThread().getName() + "=" + num++);
  22. lock.notifyAll();
  23. lock.wait();
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. }
  29. });
  30. thread1.start();
  31. thread2.start();

2、Exchanger(两个线程无序)

  1. private static Exchanger<Integer> exchanger = new Exchanger<>();
  2. public static void main(String[] args) throws InterruptedException {
  3. Thread thread1 = new Thread(() -> {
  4. Integer num = -1;
  5. while (true) {
  6. try {
  7. num = num + 2;
  8. Integer exchange = exchanger.exchange(num);
  9. System.out.println(Thread.currentThread().getName() + "=" + exchange);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. });
  15. Thread thread2 = new Thread(() -> {
  16. Integer num = -2;
  17. while (true) {
  18. try {
  19. num = num + 2;
  20. Integer exchange = exchanger.exchange(num);
  21. System.out.println(Thread.currentThread().getName() + "=" + exchange);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. });
  27. thread1.start();
  28. thread2.start();
  29. }

三、按照顺序,三个线程分别打印A5次,B10次,C15次

1、synchronized

  1. private static int flag = 1;
  2. private static Object lock = new Object();
  3. public static void main(String[] args) {
  4. Thread thread1 = new Thread(() -> {
  5. synchronized (lock) {
  6. while (true) {
  7. try {
  8. if (flag == 1) {
  9. for (int i = 0; i < 5; i++) {
  10. System.out.println("A");
  11. }
  12. flag++;
  13. }
  14. lock.notifyAll();
  15. lock.wait();
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. });
  22. Thread thread2 = new Thread(() -> {
  23. synchronized (lock) {
  24. while (true) {
  25. try {
  26. if (flag == 2) {
  27. for (int i = 0; i < 10; i++) {
  28. System.out.println("B");
  29. }
  30. flag++;
  31. }
  32. lock.notifyAll();
  33. lock.wait();
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. }
  39. });
  40. Thread thread3 = new Thread(() -> {
  41. synchronized (lock) {
  42. while (true) {
  43. try {
  44. if (flag == 3) {
  45. for (int i = 0; i < 15; i++) {
  46. System.out.println("C");
  47. }
  48. flag = 1;
  49. }
  50. lock.notifyAll();
  51. lock.wait();
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. }
  57. });
  58. thread1.start();
  59. thread2.start();
  60. thread3.start();
  61. }

2、lock

  1. static volatile int flag = 1;
  2. public static void main(String[] args) {
  3. Lock lock = new ReentrantLock();
  4. Condition c1 = lock.newCondition();
  5. Condition c2 = lock.newCondition();
  6. Condition c3 = lock.newCondition();
  7. Thread thread1 = new Thread(() -> {
  8. while (true) {
  9. lock.lock();
  10. try {
  11. if (flag != 1) {
  12. c1.await();
  13. }
  14. for (int i = 0; i < 5; i++) {
  15. System.out.println("A");
  16. }
  17. flag = 2;
  18. c2.signal();
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. } finally {
  22. lock.unlock();
  23. }
  24. }
  25. });
  26. Thread thread2 = new Thread(() -> {
  27. while (true) {
  28. lock.lock();
  29. try {
  30. if (flag != 2) {
  31. c2.await();
  32. }
  33. for (int i = 0; i < 10; i++) {
  34. System.out.println("B");
  35. }
  36. flag = 3;
  37. c3.signal();
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. } finally {
  41. lock.unlock();
  42. }
  43. }
  44. });
  45. Thread thread3 = new Thread(() -> {
  46. while (true) {
  47. lock.lock();
  48. try {
  49. if (flag != 3) {
  50. c3.await();
  51. }
  52. for (int i = 0; i < 15; i++) {
  53. System.out.println("C");
  54. }
  55. flag = 1;
  56. c1.signal();
  57. } catch (Exception e) {
  58. e.printStackTrace();
  59. } finally {
  60. lock.unlock();
  61. }
  62. }
  63. });
  64. thread1.start();
  65. thread2.start();
  66. thread3.start();
  67. }

四、创建水分子

  1. private static Lock lock = new ReentrantLock();
  2. private static Condition c1 = lock.newCondition();
  3. private static Condition c2 = lock.newCondition();
  4. private static int num = 1;
  5. public static void main(String[] args) {
  6. Thread thread1 = new Thread(() -> {
  7. while (true) {
  8. lock.lock();
  9. try {
  10. if (num == 3) {
  11. c2.signal();
  12. c1.await();
  13. }
  14. System.out.print("H" + "--------" + num);
  15. num++;
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. } finally {
  19. lock.unlock();
  20. }
  21. }
  22. });
  23. Thread thread2 = new Thread(() -> {
  24. while (true) {
  25. lock.lock();
  26. try {
  27. if (num != 3) {
  28. c1.signal();
  29. c2.await();
  30. }
  31. System.out.println("O" + "--------" + num);
  32. num = 1;
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. } finally {
  36. lock.unlock();
  37. }
  38. }
  39. });
  40. thread1.start();
  41. thread2.start();
  42. }

五、生产者,消费者

1、Disruptor

  1. public static void main(String[] args) {
  2. EventFactory<Message> factory = new MessageFactory();
  3. int ringBufferSize = 1024 * 1024;
  4. Disruptor<Message> disruptor =
  5. new Disruptor<Message>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
  6. disruptor.handleEventsWith(new MessageHandler("handler-1")).then(new MessageHandler("handler-2"), new MessageHandler("handler-3"));
  7. disruptor.start();
  8. RingBuffer<Message> ringBuffer = disruptor.getRingBuffer();
  9. Producer producer = new Producer(ringBuffer);
  10. //单生产者,生产3条数据
  11. for (int l = 0; l < 3; l++) {
  12. producer.onData(l + "");
  13. }
  14. disruptor.shutdown();
  15. }
  16. static class Producer {
  17. private RingBuffer<Message> ringBuffer;
  18. public Producer(RingBuffer<Message> ringBuffer) {
  19. this.ringBuffer = ringBuffer;
  20. }
  21. public void onData(String text) {
  22. long next = this.ringBuffer.next();
  23. Message message = ringBuffer.get(next);
  24. message.setId(UUID.randomUUID().toString());
  25. message.setText("信息" + text);
  26. ringBuffer.publish(next);
  27. }
  28. }
  29. static class MessageHandler implements EventHandler<Message> {
  30. String handlerName;
  31. public MessageHandler(String handlerName) {
  32. this.handlerName = handlerName;
  33. }
  34. @Override
  35. public void onEvent(Message message, long l, boolean b) {
  36. message.addHandler(handlerName + "已经处理");
  37. System.out.println("消息:" + message.getText() + " " + handlerName + "已经处理");
  38. }
  39. }
  40. static class MessageFactory implements EventFactory<Message> {
  41. @Override
  42. public Message newInstance() {
  43. return new Message();
  44. }
  45. }
  46. static class Message {
  47. String id;
  48. String text;
  49. List<String> handler = new ArrayList<>();
  50. public String getId() {
  51. return id;
  52. }
  53. public void setId(String id) {
  54. this.id = id;
  55. }
  56. public String getText() {
  57. return text;
  58. }
  59. public void setText(String text) {
  60. this.text = text;
  61. }
  62. public List<String> getHandler() {
  63. return handler;
  64. }
  65. public void setHandler(List<String> handler) {
  66. this.handler = handler;
  67. }
  68. public void addHandler(String handler) {
  69. this.handler.add(handler);
  70. }
  71. }

2、LinkedBlockingQueue

  1. static Random random = new Random();
  2. static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<String>(5);
  3. public static void main(String[] args) {
  4. Thread thread1 = new Thread(() -> {
  5. try {
  6. while (true) {
  7. String value = String.valueOf(random.nextInt(10));
  8. System.out.println("生产者1生产了——————" + value);
  9. linkedBlockingQueue.put(value);
  10. }
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. });
  15. Thread thread2 = new Thread(() -> {
  16. try {
  17. while (true) {
  18. String take = linkedBlockingQueue.take();
  19. System.out.println("消费了" + take);
  20. }
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. });
  25. thread1.start();
  26. thread2.start();
  27. }

3、synchronized 

  1. static int len = 10;
  2. static Random random = new Random();
  3. static Queue<String> queue = new LinkedList<String>();
  4. public static void main(String[] args) {
  5. Thread thread1 = new Thread(() -> {
  6. try {
  7. while (true) {
  8. synchronized (queue) {
  9. if (queue.size() == len) {
  10. System.out.println("当前队列满");
  11. queue.notifyAll();
  12. queue.wait();
  13. }
  14. queue.notifyAll();
  15. String value = String.valueOf(random.nextInt(10));
  16. System.out.println("生产者1生产了——————" + value);
  17. queue.add(value);
  18. Thread.yield();
  19. }
  20. }
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. });
  25. Thread thread2 = new Thread(() -> {
  26. try {
  27. while (true) {
  28. synchronized (queue) {
  29. if (queue.size() == 0) {
  30. System.out.println("当前队列为空");
  31. queue.notifyAll();
  32. queue.wait();
  33. }
  34. queue.notifyAll();
  35. String take = queue.poll();
  36. System.out.println("消费了" + take);
  37. Thread.yield();
  38. }
  39. }
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. }
  43. });
  44. thread1.start();
  45. thread2.start();
  46. }

4、Lock

  1. static int len = 10;
  2. static Lock lock = new ReentrantLock();
  3. static Condition condition = lock.newCondition();
  4. static Random random = new Random();
  5. static Queue<String> queue = new LinkedList<>();
  6. public static void main(String[] args) {
  7. Thread thread1 = new Thread(() -> {
  8. try {
  9. while (true) {
  10. lock.lock();
  11. try {
  12. if (queue.size() == len) {
  13. System.out.println("当前队列满");
  14. condition.await();
  15. }
  16. String value = String.valueOf(random.nextInt(10));
  17. System.out.println("生产者1生产了——————" + value);
  18. queue.add(value);
  19. condition.signal();
  20. } finally {
  21. lock.unlock();
  22. }
  23. }
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. }
  27. });
  28. Thread thread2 = new Thread(() -> {
  29. try {
  30. while (true) {
  31. lock.lock();
  32. try {
  33. if (queue.size() == 0) {
  34. System.out.println("当前队列为空");
  35. condition.await();
  36. }
  37. String take = queue.poll();
  38. System.out.println("消费了" + take);
  39. condition.signal();
  40. } finally {
  41. lock.unlock();
  42. }
  43. }
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. }
  47. });
  48. thread1.start();
  49. thread2.start();
  50. }

5、Semaphore

  1. static Semaphore notFull = new Semaphore(10);
  2. static Semaphore notEmpty = new Semaphore(0);
  3. static Random random = new Random();
  4. static Queue<String> queue = new ConcurrentLinkedQueue<>();
  5. public static void main(String[] args) {
  6. Thread thread1 = new Thread(() -> {
  7. while (true) {
  8. try {
  9. notFull.acquire();
  10. String value = String.valueOf(random.nextInt(10));
  11. System.out.println("生产者1生产了——————" + value);
  12. queue.add(value);
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }finally {
  16. notEmpty.release();
  17. }
  18. }
  19. });
  20. Thread thread2 = new Thread(() -> {
  21. try {
  22. while (true) {
  23. try {
  24. notEmpty.acquire();
  25. String take = queue.poll();
  26. System.out.println("消费了" + take);
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. }finally {
  30. notFull.release();
  31. }
  32. }
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. });
  37. thread1.start();
  38. thread2.start();
  39. }

6、管道

  1. static PipedOutputStream pipedOutputStream = new PipedOutputStream();
  2. static PipedInputStream pipedInputStream = new PipedInputStream();
  3. static Random random = new Random();
  4. public static void main(String[] args) throws IOException {
  5. Thread thread1 = new Thread(() -> {
  6. while (true) {
  7. try {
  8. String value = String.valueOf(random.nextInt(10));
  9. System.out.println("生产者1生产了——————" + value);
  10. pipedOutputStream.write(value.getBytes(StandardCharsets.UTF_8));
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. });
  16. Thread thread2 = new Thread(() -> {
  17. try {
  18. while (true) {
  19. try {
  20. int length = -1;
  21. byte[] buffer = new byte[64];
  22. while ((length = pipedInputStream.read(buffer)) != -1) {
  23. System.out.println("消费了" + new String(buffer, 0, length));
  24. }
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. });
  33. pipedOutputStream.connect(pipedInputStream);
  34. thread1.start();
  35. thread2.start();
  36. }

六、100个任务,分10批次,10批串行执行,10个任务并行执行

1、CyclicBarrier

批次之间结果无依赖,无返回值

  1. public class Test {
  2. static ExecutorService service = new ThreadPoolExecutor(200, 400, 30, TimeUnit.SECONDS,
  3. new LinkedBlockingQueue<>(2000), new ThreadFactoryBuilder().setNameFormat("thread-%d").build());
  4. static CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
  5. public static void main(String[] args) {
  6. List<Runnable> lists = new ArrayList<>();
  7. for (int i = 0; i < 10; i++) {
  8. for (int j = 0; j < 10; j++) {
  9. String str = "批次:" + i + " " + "任务:" + j;
  10. Runnable runnable = () -> {
  11. try {
  12. System.out.println(str);
  13. cyclicBarrier.await();
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. }
  17. };
  18. lists.add(runnable);
  19. }
  20. }
  21. lists.forEach(service::execute);
  22. }
  23. }

2、stream

  1. static ExecutorService executor = new ThreadPoolExecutor(200, 400, 30, TimeUnit.SECONDS,
  2. new LinkedBlockingQueue<>(2000), new ThreadFactoryBuilder().setNameFormat("thread-%d").build());
  3. public static void main(String[] args) {
  4. List<List<Callable<String>>> lists = new ArrayList<>();
  5. for (int i = 0; i < 10; i++) {
  6. List<Callable<String>> list = new ArrayList<>();
  7. for (int j = 0; j < 10; j++) {
  8. String str = "批次:" + i + " " + "任务:" + j;
  9. Callable<String> callable = () -> {
  10. System.out.println(str);
  11. return str;
  12. };
  13. list.add(callable);
  14. }
  15. lists.add(list);
  16. }
  17. List<String> listStream = lists.stream().map(s -> {
  18. try {
  19. return executor.invokeAll(s);
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. return null;
  23. }
  24. }).map(t -> t.parallelStream().map(fu -> {
  25. try {
  26. return fu.get();
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. return null;
  30. }
  31. }).collect(Collectors.toList())).flatMap(Collection::stream).collect(Collectors.toList());
  32. }

3、CompletableFuture

  1. public static void main(String[] args) {
  2. CompletableFuture<Void> future = new CompletableFuture<Void>();
  3. for (int i = 0; i < 10; i++) {
  4. CompletableFuture<Void>[] completableFutures = new CompletableFuture[10];
  5. for (int j = 0; j < 10; j++) {
  6. String str = "批次:" + i + " " + "任务:" + j;
  7. completableFutures[j] = CompletableFuture.runAsync(() -> System.out.println(str));
  8. }
  9. CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFutures);
  10. future.thenRun(() -> {
  11. try {
  12. allOf.get();
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. });
  17. }
  18. try {
  19. future.get();
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. }

4、Queue

  1. static ExecutorService executor = new ThreadPoolExecutor(200, 400, 30, TimeUnit.SECONDS,
  2. new LinkedBlockingQueue<>(2000), new ThreadFactoryBuilder().setNameFormat("thread-%d").build());
  3. public static void main(String[] args) {
  4. Queue<Queue<Callable<String>>> queues = new LinkedList<>();
  5. for (int i = 0; i < 10; i++) {
  6. Queue<Callable<String>> queue = new LinkedList<>();
  7. for (int j = 0; j < 10; j++) {
  8. String str = "批次:" + i + " " + "任务:" + j;
  9. Callable<String> callable = () -> {
  10. System.out.println(str);
  11. return str;
  12. };
  13. queue.add(callable);
  14. }
  15. queues.add(queue);
  16. }
  17. try {
  18. while (!queues.isEmpty()) {
  19. Queue<Callable<String>> poll = queues.poll();
  20. List<Future<String>> futureList = executor.invokeAll(poll);
  21. List<String> list = futureList.parallelStream().map(t -> {
  22. try {
  23. return t.get();
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. return null;
  27. }
  28. }).collect(Collectors.toList());
  29. }
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. }
  33. }

5、lmax.disruptor

  1. public class Test15 {
  2. static ExecutorService service = new ThreadPoolExecutor(200, 400, 30, TimeUnit.SECONDS,
  3. new LinkedBlockingQueue<>(2000), new ThreadFactoryBuilder().setNameFormat("thread-%d").build());
  4. public static void main(String[] args) {
  5. EventFactory<Queue<Runnable>> factory = new MessageFactory();
  6. int ringBufferSize = 1024 * 1024;
  7. Disruptor<Queue<Runnable>> disruptor =
  8. new Disruptor<Queue<Runnable>>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
  9. disruptor.handleEventsWith(new MessageHandler("handler-1"))
  10. .then(new MessageHandler("handler-2"))
  11. .then(new MessageHandler("handler-3"))
  12. .then(new MessageHandler("handler-4"))
  13. .then(new MessageHandler("handler-5"))
  14. .then(new MessageHandler("handler-6"))
  15. .then(new MessageHandler("handler-7"))
  16. .then(new MessageHandler("handler-8"))
  17. .then(new MessageHandler("handler-9"))
  18. .then(new MessageHandler("handler-10"));
  19. disruptor.start();
  20. RingBuffer<Queue<Runnable>> ringBuffer = disruptor.getRingBuffer();
  21. Producer producer = new Producer(ringBuffer);
  22. Queue<Runnable> queues = new LinkedList<>();
  23. for (int i = 0; i < 10; i++) {
  24. for (int j = 0; j < 10; j++) {
  25. String str = "批次:" + i + " " + "任务:" + j;
  26. Runnable runnable = () -> {
  27. System.out.println(str);
  28. };
  29. queues.add(runnable);
  30. }
  31. }
  32. producer.onData(queues);
  33. disruptor.shutdown();
  34. }
  35. static class Producer {
  36. private RingBuffer<Queue<Runnable>> ringBuffer;
  37. public Producer(RingBuffer<Queue<Runnable>> ringBuffer) {
  38. this.ringBuffer = ringBuffer;
  39. }
  40. public void onData(Queue<Runnable> runnable) {
  41. long next = this.ringBuffer.next();
  42. Queue<Runnable> runnables = ringBuffer.get(next);
  43. runnables.addAll(runnable);
  44. ringBuffer.publish(next);
  45. }
  46. }
  47. static class MessageHandler implements EventHandler<Queue<Runnable>> {
  48. String handlerName;
  49. public MessageHandler(String handlerName) {
  50. this.handlerName = handlerName;
  51. }
  52. @Override
  53. public void onEvent(Queue<Runnable> runnables, long l, boolean b) throws Exception {
  54. for (int i = 0; i < 10; i++) {
  55. service.submit(runnables.poll()).get();
  56. }
  57. }
  58. }
  59. static class MessageFactory implements EventFactory<Queue<Runnable>> {
  60. @Override
  61. public Queue<Runnable> newInstance() {
  62. return new LinkedList<>();
  63. }
  64. }
  65. }

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/197738
推荐阅读
相关标签
  

闽ICP备14008679号