当前位置:   article > 正文

Rust入门(十二):并发_thread::spawn

thread::spawn

并发编程,代表程序的不同部分相互独立的执行,而并行编程(parallel programming)代表程序不同部分于同时执行,这两个概念随着计算机越来越多的利用多处理器的优势时显得愈发重要。由于历史原因,在此类上下文中编程一直是困难且容易出错的:Rust 希望能改变这一点。因此,Rust 提供了多种工具,以符合实际情况和需求的方式来为问题建模。、

线程

在程序内部,也可以拥有多个同时运行的独立部分。运行这些独立部分的功能被称为线程。将程序中的计算拆分进多个线程可以改善性能,因为程序可以同时进行多个任务,不过这也会增加复杂性。这会导致诸如此类的问题:

  • 竞态条件(Race conditions),多个线程以不一致的顺序访问数据或资源
  • 死锁(Deadlocks),两个线程相互等待对方停止使用其所拥有的资源,这会阻止它们继续运行
  • 只会发生在特定情况且难以稳定重现和修复的 bug

为了创建一个新线程,需要调用 thread::spawn 函数并传递一个闭包,,并在其中包含希望在新线程运行的代码。注意这个函数编写的方式,当主线程结束时,新线程也会结束,而不管其是否执行完毕。

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

thread::sleep 调用强制线程停止执行一小段时间,这会允许其他不同的线程运行。这些线程可能会轮流运行,不过并不保证如此:这依赖操作系统如何调度线程。

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

JOIN

可以通过将 thread::spawn 的返回值储存在变量中来修复新建线程部分没有执行或者完全没有执行的问题。thread::spawn 的返回值类型是 JoinHandleJoinHandle 是一个拥有所有权的值,当对其调用 join 方法时,它会等待其线程结束。

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

通过调用 handle 的 join 会阻塞当前线程直到 handle 所代表的线程结束,如果将 handle.join() 移动到 mainfor 循环之前,主线程会等待直到新建线程执行完毕之后才开始执行 for 循环。

MOVE

move 关键字经常用于传递给 thread::spawn 的闭包,因为闭包会获取从环境中取得的值的所有权,为了在新建线程中使用来自于主线程的数据,需要新建线程的闭包获取它需要的值,如下的例子不能运行,因为Rust 会推断如何捕获 v,因为 println! 只需要 v 的引用,闭包尝试借用 v。然而这有一个问题:Rust 不知道这个新建线程会执行多久,所以无法知晓 v 的引用是否一直有效。

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

通过在闭包之前增加 move 关键字,我们强制闭包获取其使用的值的所有权,而不是任由 Rust 推断它应该借用值。

use std::thread;
fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });
    handle.join().unwrap();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

进程通信

Rust 中一个实现消息传递并发的主要工具是信道(channel),Rust 标准库提供了其实现的编程概念。编程中的信息渠道(信道)有两部分组成,一个发送者(transmitter)和一个接收者(receiver),代码中的一部分调用发送者的方法以及希望发送的数据,另一部分则检查接收端收到的消息。当发送者或接收者任一被丢弃时可以认为信道被关闭(closed)了。

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}
  • 1
  • 2
  • 3
  • 4
  • 5

这里使用 mpsc::channel 函数创建一个新的信道;mpsc多个生产者,单个消费者multiple producer, single consumer)的缩写。Rust 标准库实现信道的方式意味着一个信道可以有多个产生值的 发送sending)端,但只能有一个消费这些值的 接收receiving)端。

mpsc::channel 函数返回一个元组:第一个元素是发送端,而第二个元素是接收端。这里再次使用 thread::spawn 来创建一个新线程并使用 movetx 移动到闭包中这样新建线程就拥有 tx 了。新建线程需要拥有信道的发送端以便能向信道发送消息。信道的发送端有一个 send 方法用来获取需要放入信道的值。send 方法返回一个 Result<T, E> 类型,所以如果接收端已经被丢弃了,将没有发送值的目标,所以发送操作会返回错误。

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

信道的接收端有两个有用的方法:recvtry_recv

  • recv这个方法会阻塞主线程执行直到从信道中接收一个值。一旦发送了一个值,recv 会在一个 Result<T, E> 中返回它。
  • try_recv 不会阻塞,相反它立刻返回一个 Result<T, E>Ok 值包含可用的信息,而 Err 值代表此时没有任何消息。如果线程在等待消息过程中还有其他工作时使用 try_recv 很有用

信道所有权

所有权规则在消息传递中扮演了重要角色,其有助于我们编写安全的并发代码。一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。因为其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果。如下的例子会报错,因为一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃,其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果。

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

发送多个值

在新建线程中有一个字符串 vector 希望发送到主线程。我们遍历他们,单独的发送每一个字符串并通过一个 Duration 值调用 thread::sleep 函数来暂停一秒。在主线程中,不再显式调用 recv 函数:而是将 rx 当作一个迭代器。对于每一个接收到的值,我们将其打印出来。当信道被关闭时,迭代器也将结束。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

多个接收者

可以运用 mpsc 克隆信道来创建向同一接收者发送值的多个线程。

    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

    // --snip--
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

在创建新线程之前,我们对信道的发送端调用了 clone 方法。这会给我们一个可以传递给第一个新建线程的发送端句柄。我们会将原始的信道发送端传递给第二个新建线程。这样就会有两个线程,每个线程将向信道的接收端发送不同的消息。但是这样你可能会看到这些值以不同的顺序出现,因为我们无法控制两个线程发送信息的先后顺序。

状态共享

信道类似于单所有权,因为一旦将一个值传送到信道中,将无法再使用这个值。共享内存类似于多所有权:多个线程可以同时访问相同的内存位置。

互斥器mutex)允许一个线程访问某些数据。为了访问互斥器中的数据,线程首先需要通过获取互斥器的 lock)来表明其希望访问数据。锁是一个作为互斥器一部分的数据结构,它记录谁有数据的排他访问权。因此,我们描述互斥器为通过锁系统 保护guarding)其数据。

我们使用关联函数 new 来创建一个 Mutex<T>。使用 lock 方法获取锁,以访问互斥器中的数据。这个调用会阻塞当前线程,直到我们拥有锁为止。

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

一旦获取了锁,就可以将返回值(在这里是num)视为一个其内部数据的可变引用了。类型系统确保了我们在使用 m 中的值之前获取锁:Mutex<i32> 并不是一个 i32,所以 必须 获取锁才能使用这个 i32 值。

我们将启动十个线程,并在各个线程中对同一个计数器值加一,这样计数器将从 0 变为 10。但是这个例子会报错,Rust 告诉我们不能将 counter 锁的所有权移动到多个线程中。

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Arc<T> 是一个类似 Rc<T> 且用于并发环境的类型。线程安全带有性能惩罚,我们希望只在必要时才为此买单。如果只是在单线程中对值进行操作,原子性提供的保证并无必要,代码可以因此运行的更快:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Sync和Send

由于不需要语言提供并发相关的基础设施,并发方案不受标准库或语言所限:我们可以编写自己的或使用别人编写的并发功能。有两个并发概念是内嵌于语言中的:std::marker 中的 SyncSend trait。

Send 标记 trait 表明实现了 Send 的类型值的所有权可以在线程间传送。几乎所有的 Rust 类型都是Send 的,不过有一些例外,包括 Rc<T>:这是不能 Send 的,因为如果克隆了 Rc<T> 的值并尝试将克隆的所有权转移到另一个线程,这两个线程都可能同时更新引用计数。

Sync 标记 trait 表明一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用。换一种方式来说,对于任意类型 T,如果 &TT 的不可变引用)是 Send 的话 T 就是 Sync 的,这意味着其引用就可以安全的发送到另一个线程。智能指针 Rc<T> 也不是 Sync 的,出于其不是 Send 相同的原因。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/909644
推荐阅读
相关标签
  

闽ICP备14008679号