赞
踩
Java&Go 并发编程比较
锁
使用一个2线程(协程)累计数的例子来展示
java
java中的锁是使用synchronized或者ReentrantLock,java中synchronized关键字对不同对象使用有不同的效果,可以对对象,实例方法,静态方法使用,分别表示给对象,实例,类加锁。
synchronized
public class AccountingSync implements Runnable {
static AccountingSync instance = new AccountingSync();
static int i = 0;
@Override
public void run() {
for(int j = 0; j < 500; j++) {
increase();
}
}
public synchronized void increase() {
i++;
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(instance);
Thread t2 = new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
ReentrantLock
import java.util.concurrent.locks.ReentrantLock;
public class ReenterLock implements Runnable {
public static int i = 0;
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
for(int j = 0; j < 5000; j++) {
lock.lock();
i++;
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLock rl = new ReenterLock();
Thread t1 = new Thread(rl);
Thread t2 = new Thread(rl);
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println(i);
}
}
go
对于go来说锁只有一个:sync.Mutex
package main
import (
"fmt"
"sync"
)
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
)
func add() {
for i := 0; i < 5000; i++ {
lock.Lock()
x = x + 1
lock.Unlock()
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
比较
sync.Mutex是不可重入的锁,多次对sync.Mutex加锁会导致死锁
go并发的是协程,所以没有join(), wait(), notify()的操作,但是都有相关的替代方法。这里一组协程同步的方式使用了sync.WaitGroup(sync.WaitGroup类似于java之中的CountDownLatch)来等待所有协程结束。
对于java之中wait()/notify(), ReentrantLock/Condition来实现线程协同,Go也有相关的处理方式,这个之后的例子会给出。
读写锁
读写锁java和go的处理类似, java使用ReentrantReadWriteLock,go使用sync.RWMutex。
java
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteDemo {
private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private static Lock writeLock = reentrantReadWriteLock.writeLock();
private static Lock readLock = reentrantReadWriteLock.readLock();
private static int value = 0;
......
}
go
package main
import (
"fmt"
"sync"
"time"
)
var (
x int64
wg sync.WaitGroup
rwlock sync.RWMutex
)
func write() {
rwlock.Lock() // 加写锁
x = x + 1
time.Sleep(100 * time.Millisecond)
rwlock.Unlock() // 解写锁
wg.Done()
}
func read() {
rwlock.RLock() // 加读锁
time.Sleep(100 * time.Millisecond)
rwlock.RUnlock() // 解读锁
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 100; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
程序输出的执行时间为:1.1067105s,符合预期。
并发同步
使用一个例子两个线(协)程并发执行,交替输出1-1000(一个线(协)程输出奇数,一个输出偶数)。
java
java可以使用wait/notify来实现:
public class TestT {
public static int cur = 0;
//不要使用Integer,Long,String等不可变类型作为wait/notify的类型,
//否则java.lang.IllegalMonitorStateException
//参考:https://blog.csdn.net/historyasamirror/article/details/6709693
public static Object wait = new Object();
public static void main(String[] args) throws Exception{
Thread t1 = new Thread(() -> {
while(cur < 1000) {
// wait/notify是获取同步对象的wait/notify
// 所以当然是在同步块之中才可以wait/notify
// 否则java.lang.IllegalMonitorStateException
synchronized (wait) {
if(cur % 2 == 0) {
System.out.println(cur+1);
cur++;
wait.notify();
} else {
try {
wait.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
Thread t2 = new Thread(() -> {
while(cur < 1000) {
synchronized (wait) {
if(cur % 2 == 1) {
System.out.println(cur+1);
cur++;
wait.notify();
} else {
try {
wait.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
t1.start(); t2.start();
t1.join(); t2.join();
}
}
当然上述的代码将synchronized、wait、notify替换为ReentrantLock、Condition操作:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestT2 {
public static int cur = 0;
public static ReentrantLock reentrantLock = new ReentrantLock();
public static Condition condition = reentrantLock.newCondition();
public static void main(String[] args) throws Exception{
Thread t1 = new Thread(() -> {
while(cur < 1000) {
try{
reentrantLock.lock();
if(cur % 2 == 0) {
System.out.println(cur+1);
cur++;
//唤起一个等待中的线程
condition.signal();
} else {
//await等待并释放锁,唤起时会重新获得锁
condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
});
Thread t2 = new Thread(() -> {
while(cur < 1000) {
try{
reentrantLock.lock();
if(cur % 2 == 1) {
System.out.println(cur+1);
cur++;
condition.signal();
} else {
condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
});
t1.start(); t2.start();
t1.join(); t2.join();
}
}
go
go可以使用chan来进行协程之间的通信:
package main
import (
"fmt"
"sync"
)
var ch1 = make(chan bool, 1)
var ch2 = make(chan bool)
var wg sync.WaitGroup
var i = 1
func main() {
go func() {
for i < 1000 {
if ok :=
fmt.Println("go A:", i)
i++
ch2
}
}
wg.Done()
}()
go func() {
for i < 1000 {
if ok :=
fmt.Println("go B:", i)
i++
ch1
}
}
wg.Done()
}()
wg.Add(2)
ch1
wg.Wait()
}
CAS
java之中的cas操作为AutomicXXX,比如AtomicInteger;go之中的cas为sync/atomic提供函数来进行操作。具体的参考相关文档。这里使用cas来改造之前的两个例子:
计数
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerDemo {
static AtomicInteger i = new AtomicInteger();
public static class AddThread implements Runnable{
@Override
public void run(){
for(int j = 0; j < 1000; j++) {
i.incrementAndGet();
}
}
}
public static void main(String[] args) throws Exception{
Thread[] ts = new Thread[10];
for(int k = 0; k < ts.length; k++) {
ts[k] = new Thread(new AddThread());
}
for(int k = 0; k < ts.length; k++) {
ts[k].start();
}
for(int k = 0; k < ts.length; k++) {
ts[k].join();
}
System.out.println(i);
}
}
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var x int64
var wg sync.WaitGroup
// 原子操作版加函数
func atomicAdd() {
for i := 0; i < 1000; i++ {
atomic.AddInt64(&x, 1)
}
wg.Done()
}
func main() {
wg.Add(10)
for i := 0; i < 10; i++ {
go atomicAdd()
}
wg.Wait()
fmt.Println(x)
}
java为面向对象编程,go面向函数编程;所以java的cas操作是AtomicXXX类的方法,go为sync/atomic包之中的函数。
协同
import java.util.concurrent.atomic.AtomicBoolean;
class Test4 {
//volatile需要加上
public static volatile int num = 0;
public static AtomicBoolean isShuang = new AtomicBoolean(false);
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> {
for (; 1000000 > num; ) {
if(!isShuang.get()){
System.out.println(num);
num++;
//isShuang.set(true);
//这里只用set就可以了,只是为了展示下
isShuang.compareAndSet(false, true)
}
}
}
);
Thread t2 = new Thread(() -> {
for (; 1000000 > num; ) {
if(isShuang.get()) {
System.out.println(num);
num++;
//isShuang.set(false);
isShuang.compareAndSet(true, false)
}
}
}
);
//性能约为加锁的1倍
long startTime = System.currentTimeMillis();
t1.start();
t2.start();
t1.join();
t2.join();
long endTime = System.currentTimeMillis();
long usedTime = (endTime-startTime)/1000;
System.out.println(usedTime);
}
}
约执行了3s
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
lock int32
num int
wg sync.WaitGroup
)
func main() {
wg.Add(2)
start := time.Now()
go func() {
for ;num < 1000000; {
if atomic.LoadInt32(&lock) == 0 {
num++
fmt.Println("A:", num)
atomic.StoreInt32(&lock, 1)
}
}
wg.Done()
}()
go func() {
for ;num < 1000000; {
if atomic.LoadInt32(&lock) == 1 {
num++
fmt.Println("B:", num)
atomic.StoreInt32(&lock, 0)
}
}
wg.Done()
}()
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
约执行2s
似乎协程切换是要高效不少啊。
生产者消费者模型
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。