赞
踩
1、队列+park+volatile
- private static LinkedList<Thread> list = new LinkedList<>();
- private static volatile Boolean state = new Boolean(false);
-
- public static void main(String[] args) {
- Thread thread1 = new Thread(() -> {
- while (true) {
- LockSupport.park();
- System.out.println("a");
- state = new Boolean(true);
- }
- });
- thread1.start();
- list.add(thread1);
-
- Thread thread2 = new Thread(() -> {
- while (true) {
- LockSupport.park();
- System.out.println("b");
- state = new Boolean(true);
- }
- });
- thread2.start();
- list.add(thread2);
-
- Thread thread3 = new Thread(() -> {
- while (true) {
- LockSupport.park();
- System.out.println("c");
- state = new Boolean(true);
- }
- });
- thread3.start();
- list.add(thread3);
-
- while (true) {
- Thread take = list.poll();
- LockSupport.unpark(take);
- while (true) {
- if (state.booleanValue() == true) {
- state = new Boolean(false);
- list.add(take);
- break;
- }
- }
- }
- }
1、synchronized
- private static final Object lock = new Object();
- private static int num = 0;
-
- public static void main(String[] args) throws InterruptedException {
- Thread thread1 = new Thread(() -> {
- while (true) {
- synchronized (lock) {
- try {
- System.out.println(Thread.currentThread().getName() + "=" + num++);
- lock.notifyAll();
- lock.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
-
-
- Thread thread2 = new Thread(() -> {
- while (true) {
- synchronized (lock) {
- try {
- System.out.println(Thread.currentThread().getName() + "=" + num++);
- lock.notifyAll();
- lock.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
- thread1.start();
- thread2.start();
2、Exchanger(两个线程无序)
- private static Exchanger<Integer> exchanger = new Exchanger<>();
-
- public static void main(String[] args) throws InterruptedException {
- Thread thread1 = new Thread(() -> {
- Integer num = -1;
- while (true) {
- try {
- num = num + 2;
- Integer exchange = exchanger.exchange(num);
- System.out.println(Thread.currentThread().getName() + "=" + exchange);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
-
-
- Thread thread2 = new Thread(() -> {
- Integer num = -2;
- while (true) {
- try {
- num = num + 2;
- Integer exchange = exchanger.exchange(num);
- System.out.println(Thread.currentThread().getName() + "=" + exchange);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- thread1.start();
- thread2.start();
-
-
- }
1、synchronized
- private static int flag = 1;
- private static Object lock = new Object();
-
- public static void main(String[] args) {
-
-
- Thread thread1 = new Thread(() -> {
- synchronized (lock) {
- while (true) {
- try {
- if (flag == 1) {
- for (int i = 0; i < 5; i++) {
- System.out.println("A");
- }
- flag++;
- }
- lock.notifyAll();
- lock.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
-
- Thread thread2 = new Thread(() -> {
- synchronized (lock) {
- while (true) {
- try {
- if (flag == 2) {
- for (int i = 0; i < 10; i++) {
- System.out.println("B");
- }
- flag++;
- }
- lock.notifyAll();
- lock.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
-
- Thread thread3 = new Thread(() -> {
- synchronized (lock) {
- while (true) {
- try {
- if (flag == 3) {
- for (int i = 0; i < 15; i++) {
- System.out.println("C");
- }
- flag = 1;
- }
- lock.notifyAll();
- lock.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- });
- thread1.start();
- thread2.start();
- thread3.start();
- }
2、lock
- static volatile int flag = 1;
-
- public static void main(String[] args) {
-
- Lock lock = new ReentrantLock();
- Condition c1 = lock.newCondition();
- Condition c2 = lock.newCondition();
- Condition c3 = lock.newCondition();
-
-
- Thread thread1 = new Thread(() -> {
- while (true) {
- lock.lock();
- try {
- if (flag != 1) {
- c1.await();
- }
- for (int i = 0; i < 5; i++) {
- System.out.println("A");
- }
- flag = 2;
- c2.signal();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- });
-
- Thread thread2 = new Thread(() -> {
- while (true) {
- lock.lock();
- try {
- if (flag != 2) {
- c2.await();
- }
- for (int i = 0; i < 10; i++) {
- System.out.println("B");
- }
- flag = 3;
- c3.signal();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- });
-
- Thread thread3 = new Thread(() -> {
- while (true) {
- lock.lock();
- try {
- if (flag != 3) {
- c3.await();
- }
- for (int i = 0; i < 15; i++) {
- System.out.println("C");
- }
- flag = 1;
- c1.signal();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- });
- thread1.start();
- thread2.start();
- thread3.start();
- }
- private static Lock lock = new ReentrantLock();
- private static Condition c1 = lock.newCondition();
- private static Condition c2 = lock.newCondition();
- private static int num = 1;
-
- public static void main(String[] args) {
- Thread thread1 = new Thread(() -> {
- while (true) {
- lock.lock();
- try {
- if (num == 3) {
- c2.signal();
- c1.await();
- }
- System.out.print("H" + "--------" + num);
- num++;
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- });
- Thread thread2 = new Thread(() -> {
- while (true) {
- lock.lock();
- try {
- if (num != 3) {
- c1.signal();
- c2.await();
- }
- System.out.println("O" + "--------" + num);
- num = 1;
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- });
- thread1.start();
- thread2.start();
- }
1、Disruptor
- public static void main(String[] args) {
- EventFactory<Message> factory = new MessageFactory();
-
- int ringBufferSize = 1024 * 1024;
-
- Disruptor<Message> disruptor =
-
- new Disruptor<Message>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
-
- disruptor.handleEventsWith(new MessageHandler("handler-1")).then(new MessageHandler("handler-2"), new MessageHandler("handler-3"));
-
- disruptor.start();
-
- RingBuffer<Message> ringBuffer = disruptor.getRingBuffer();
-
- Producer producer = new Producer(ringBuffer);
-
- //单生产者,生产3条数据
-
- for (int l = 0; l < 3; l++) {
-
- producer.onData(l + "");
-
- }
-
-
- disruptor.shutdown();
-
- }
-
- static class Producer {
-
- private RingBuffer<Message> ringBuffer;
-
- public Producer(RingBuffer<Message> ringBuffer) {
- this.ringBuffer = ringBuffer;
- }
-
- public void onData(String text) {
- long next = this.ringBuffer.next();
- Message message = ringBuffer.get(next);
- message.setId(UUID.randomUUID().toString());
- message.setText("信息" + text);
- ringBuffer.publish(next);
- }
-
- }
-
-
- static class MessageHandler implements EventHandler<Message> {
-
- String handlerName;
-
- public MessageHandler(String handlerName) {
- this.handlerName = handlerName;
- }
-
- @Override
- public void onEvent(Message message, long l, boolean b) {
- message.addHandler(handlerName + "已经处理");
- System.out.println("消息:" + message.getText() + " " + handlerName + "已经处理");
- }
- }
-
- static class MessageFactory implements EventFactory<Message> {
-
- @Override
- public Message newInstance() {
- return new Message();
- }
- }
-
- static class Message {
- String id;
- String text;
- List<String> handler = new ArrayList<>();
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getText() {
- return text;
- }
-
- public void setText(String text) {
- this.text = text;
- }
-
- public List<String> getHandler() {
- return handler;
- }
-
- public void setHandler(List<String> handler) {
- this.handler = handler;
- }
-
- public void addHandler(String handler) {
- this.handler.add(handler);
- }
- }
2、LinkedBlockingQueue
- static Random random = new Random();
- static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<String>(5);
-
- public static void main(String[] args) {
- Thread thread1 = new Thread(() -> {
- try {
- while (true) {
- String value = String.valueOf(random.nextInt(10));
- System.out.println("生产者1生产了——————" + value);
- linkedBlockingQueue.put(value);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- Thread thread2 = new Thread(() -> {
- try {
- while (true) {
- String take = linkedBlockingQueue.take();
- System.out.println("消费了" + take);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- thread1.start();
- thread2.start();
- }
3、synchronized
- static int len = 10;
- static Random random = new Random();
- static Queue<String> queue = new LinkedList<String>();
-
-
- public static void main(String[] args) {
- Thread thread1 = new Thread(() -> {
- try {
- while (true) {
- synchronized (queue) {
- if (queue.size() == len) {
- System.out.println("当前队列满");
- queue.notifyAll();
- queue.wait();
- }
- queue.notifyAll();
- String value = String.valueOf(random.nextInt(10));
- System.out.println("生产者1生产了——————" + value);
- queue.add(value);
- Thread.yield();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- Thread thread2 = new Thread(() -> {
- try {
- while (true) {
- synchronized (queue) {
- if (queue.size() == 0) {
- System.out.println("当前队列为空");
- queue.notifyAll();
- queue.wait();
- }
- queue.notifyAll();
- String take = queue.poll();
- System.out.println("消费了" + take);
- Thread.yield();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- thread1.start();
- thread2.start();
- }
4、Lock
- static int len = 10;
- static Lock lock = new ReentrantLock();
- static Condition condition = lock.newCondition();
-
- static Random random = new Random();
- static Queue<String> queue = new LinkedList<>();
-
-
- public static void main(String[] args) {
- Thread thread1 = new Thread(() -> {
- try {
- while (true) {
- lock.lock();
- try {
- if (queue.size() == len) {
- System.out.println("当前队列满");
- condition.await();
- }
- String value = String.valueOf(random.nextInt(10));
- System.out.println("生产者1生产了——————" + value);
- queue.add(value);
- condition.signal();
- } finally {
- lock.unlock();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- Thread thread2 = new Thread(() -> {
- try {
- while (true) {
- lock.lock();
- try {
- if (queue.size() == 0) {
- System.out.println("当前队列为空");
- condition.await();
- }
- String take = queue.poll();
- System.out.println("消费了" + take);
- condition.signal();
- } finally {
- lock.unlock();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- thread1.start();
- thread2.start();
- }
5、Semaphore
- static Semaphore notFull = new Semaphore(10);
- static Semaphore notEmpty = new Semaphore(0);
-
- static Random random = new Random();
- static Queue<String> queue = new ConcurrentLinkedQueue<>();
-
-
- public static void main(String[] args) {
- Thread thread1 = new Thread(() -> {
- while (true) {
- try {
- notFull.acquire();
- String value = String.valueOf(random.nextInt(10));
- System.out.println("生产者1生产了——————" + value);
- queue.add(value);
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- notEmpty.release();
- }
- }
- });
-
- Thread thread2 = new Thread(() -> {
- try {
- while (true) {
- try {
- notEmpty.acquire();
- String take = queue.poll();
- System.out.println("消费了" + take);
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- notFull.release();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- thread1.start();
- thread2.start();
- }
6、管道
- static PipedOutputStream pipedOutputStream = new PipedOutputStream();
- static PipedInputStream pipedInputStream = new PipedInputStream();
-
- static Random random = new Random();
-
-
- public static void main(String[] args) throws IOException {
- Thread thread1 = new Thread(() -> {
- while (true) {
- try {
- String value = String.valueOf(random.nextInt(10));
- System.out.println("生产者1生产了——————" + value);
- pipedOutputStream.write(value.getBytes(StandardCharsets.UTF_8));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
-
- Thread thread2 = new Thread(() -> {
- try {
- while (true) {
- try {
- int length = -1;
- byte[] buffer = new byte[64];
- while ((length = pipedInputStream.read(buffer)) != -1) {
- System.out.println("消费了" + new String(buffer, 0, length));
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- pipedOutputStream.connect(pipedInputStream);
-
- thread1.start();
- thread2.start();
- }
1、CyclicBarrier
批次之间结果无依赖,无返回值
- public class Test {
-
- static ExecutorService service = new ThreadPoolExecutor(200, 400, 30, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(2000), new ThreadFactoryBuilder().setNameFormat("thread-%d").build());
-
- static CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
-
- public static void main(String[] args) {
-
- List<Runnable> lists = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < 10; j++) {
- String str = "批次:" + i + " " + "任务:" + j;
- Runnable runnable = () -> {
- try {
- System.out.println(str);
- cyclicBarrier.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- };
- lists.add(runnable);
- }
- }
- lists.forEach(service::execute);
- }
- }
2、stream
- static ExecutorService executor = new ThreadPoolExecutor(200, 400, 30, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(2000), new ThreadFactoryBuilder().setNameFormat("thread-%d").build());
-
- public static void main(String[] args) {
-
- List<List<Callable<String>>> lists = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- List<Callable<String>> list = new ArrayList<>();
- for (int j = 0; j < 10; j++) {
- String str = "批次:" + i + " " + "任务:" + j;
- Callable<String> callable = () -> {
- System.out.println(str);
- return str;
- };
- list.add(callable);
- }
- lists.add(list);
- }
- List<String> listStream = lists.stream().map(s -> {
- try {
- return executor.invokeAll(s);
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }).map(t -> t.parallelStream().map(fu -> {
- try {
- return fu.get();
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }).collect(Collectors.toList())).flatMap(Collection::stream).collect(Collectors.toList());
- }
3、CompletableFuture
- public static void main(String[] args) {
- CompletableFuture<Void> future = new CompletableFuture<Void>();
- for (int i = 0; i < 10; i++) {
- CompletableFuture<Void>[] completableFutures = new CompletableFuture[10];
- for (int j = 0; j < 10; j++) {
- String str = "批次:" + i + " " + "任务:" + j;
- completableFutures[j] = CompletableFuture.runAsync(() -> System.out.println(str));
- }
- CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFutures);
- future.thenRun(() -> {
- try {
- allOf.get();
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
- }
- try {
- future.get();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
4、Queue
- static ExecutorService executor = new ThreadPoolExecutor(200, 400, 30, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(2000), new ThreadFactoryBuilder().setNameFormat("thread-%d").build());
-
- public static void main(String[] args) {
- Queue<Queue<Callable<String>>> queues = new LinkedList<>();
- for (int i = 0; i < 10; i++) {
- Queue<Callable<String>> queue = new LinkedList<>();
- for (int j = 0; j < 10; j++) {
- String str = "批次:" + i + " " + "任务:" + j;
- Callable<String> callable = () -> {
- System.out.println(str);
- return str;
- };
- queue.add(callable);
- }
- queues.add(queue);
- }
- try {
- while (!queues.isEmpty()) {
- Queue<Callable<String>> poll = queues.poll();
- List<Future<String>> futureList = executor.invokeAll(poll);
- List<String> list = futureList.parallelStream().map(t -> {
- try {
- return t.get();
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }).collect(Collectors.toList());
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
5、lmax.disruptor
- public class Test15 {
-
- static ExecutorService service = new ThreadPoolExecutor(200, 400, 30, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(2000), new ThreadFactoryBuilder().setNameFormat("thread-%d").build());
-
- public static void main(String[] args) {
- EventFactory<Queue<Runnable>> factory = new MessageFactory();
-
- int ringBufferSize = 1024 * 1024;
-
- Disruptor<Queue<Runnable>> disruptor =
-
- new Disruptor<Queue<Runnable>>(factory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
-
-
- disruptor.handleEventsWith(new MessageHandler("handler-1"))
- .then(new MessageHandler("handler-2"))
- .then(new MessageHandler("handler-3"))
- .then(new MessageHandler("handler-4"))
- .then(new MessageHandler("handler-5"))
- .then(new MessageHandler("handler-6"))
- .then(new MessageHandler("handler-7"))
- .then(new MessageHandler("handler-8"))
- .then(new MessageHandler("handler-9"))
- .then(new MessageHandler("handler-10"));
-
- disruptor.start();
-
- RingBuffer<Queue<Runnable>> ringBuffer = disruptor.getRingBuffer();
-
- Producer producer = new Producer(ringBuffer);
-
-
- Queue<Runnable> queues = new LinkedList<>();
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < 10; j++) {
- String str = "批次:" + i + " " + "任务:" + j;
- Runnable runnable = () -> {
- System.out.println(str);
- };
- queues.add(runnable);
- }
- }
- producer.onData(queues);
-
- disruptor.shutdown();
-
- }
-
- static class Producer {
-
- private RingBuffer<Queue<Runnable>> ringBuffer;
-
- public Producer(RingBuffer<Queue<Runnable>> ringBuffer) {
- this.ringBuffer = ringBuffer;
- }
-
- public void onData(Queue<Runnable> runnable) {
- long next = this.ringBuffer.next();
- Queue<Runnable> runnables = ringBuffer.get(next);
- runnables.addAll(runnable);
- ringBuffer.publish(next);
- }
-
- }
-
-
- static class MessageHandler implements EventHandler<Queue<Runnable>> {
-
- String handlerName;
-
- public MessageHandler(String handlerName) {
- this.handlerName = handlerName;
- }
-
- @Override
- public void onEvent(Queue<Runnable> runnables, long l, boolean b) throws Exception {
- for (int i = 0; i < 10; i++) {
- service.submit(runnables.poll()).get();
- }
- }
- }
-
- static class MessageFactory implements EventFactory<Queue<Runnable>> {
-
- @Override
- public Queue<Runnable> newInstance() {
- return new LinkedList<>();
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。