Rust 并发编程:使用消息传递进行线程间数据共享

news/2025/2/27 9:29:56

一、通道(Channel)的基本概念

一个通道可以想象成一条单向水道或河流:有一个 发送端(transmitter) 和一个 接收端(receiver)。发送端好比河流上游,负责把“橡皮鸭”丢进水里;接收端在河流下游,收到这只“橡皮鸭”。在编程中,线程之间的通信即是这样——把数据发到通道的一端,另外一个(或多个)线程在通道的另一端接收。

Rust 通过 std::sync::mpsc(Multiple Producer, Single Consumer)来提供通道功能:

  • Multiple Producer:可以有多个发送端同时发送数据;
  • Single Consumer:但只能有一个接收端来接收数据。

通过克隆发送端可以允许多个线程一起发送数据给同一个接收端。

二、创建并使用通道

1. 基础用法:创建一个 mpsc::channel

rust,ignore">use std::sync::mpsc;
use std::thread;

fn main() {
    // 创建一个通道
    let (tx, rx) = mpsc::channel();

    // 这里的 tx 是 transmitter(发送端),rx 是 receiver(接收端)。
    // 我们先不发送任何数据,因此代码暂时无法编译,
    // 因为编译器不知道通道要发送什么类型的数据。
}

mpsc::channel() 函数会返回一个元组 (tx, rx),分别代表发送端和接收端。在之后,我们会看到如何把 tx 移动到不同线程去发送消息,rx 则留在当前线程用于接收消息。

2. 在子线程中发送消息

下面的例子中,我们在主线程创建了通道,然后把发送端 tx 移动(move)到子线程中,子线程通过 tx.send() 发送一条字符串“hi”给主线程。

rust">use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap(); 
        // 如果接收端已关闭,则 send 会返回错误,这里直接 unwrap 处理
    });

    // 在主线程接收消息
    let received = rx.recv().unwrap(); 
    // recv 会阻塞主线程,直到收到一条消息或者发送端被关闭
    println!("Got: {}", received);
}

运行后,主线程会打印:

Got: hi

这表示主线程成功地收到了子线程通过通道发送的字符串。

3. 通道与所有权

当我们调用 tx.send(val) 时,send 方法会获取 val 的所有权。这样做能够避免在另一个线程修改数据后,我们在原线程又使用这段数据的潜在风险。例如,下面这段示例代码(示意)试图在发送之后继续使用 val,就会导致编译错误:

rust,ignore">use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    let val = String::from("hello");

    thread::spawn(move || {
        tx.send(val).unwrap();
        // 发送后 val 的所有权已转移到通道
        // println!("val is: {}", val); // 编译错误: val 所有权已经被移动
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

由于所有权已经转移,主线程可以安全地接收并处理这条消息,而子线程也不会再访问已经移出的数据。这种严格的所有权规则能有效避免数据竞争和其他并发错误。

4. 发送多个消息

我们可以让子线程发送不止一条消息。下面的例子让子线程依次发送多条字符串,并在发送之间加上 sleep 用来模拟耗时操作:

rust">use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    // 主线程中,通过 iter 的方式持续接收消息
    for received in rx {
        println!("Got: {}", received);
    }
}

由于接收端可以被当做迭代器来使用,当所有发送端都关闭时,for 循环会自动结束。这段程序会像下面这样依次打印每条消息:

Got: hi
Got: from
Got: the
Got: thread

5. 多个发送端(Multiple Producer)

mpsc 的含义之一就是 Multiple Producer。如果我们希望有多个不同的子线程来发送消息给同一个接收端,只需要克隆发送端即可。如下:

rust">use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("first thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap(); // 这里使用原先的 tx
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}
  • 第一个子线程使用 tx1
  • 第二个子线程使用原本的 tx

所有发送过来的数据都将流向同一个 rx(接收端)。运行结果每次可能都不一样,因为线程的调度顺序不可预测,这也正是并发编程“有趣”且需要谨慎之处。

三、总结

  1. 创建通道:使用 mpsc::channel() 创建通道,获得 (tx, rx)(发送端和接收端)。
  2. 发送数据tx.send(data) 会转移 data 的所有权,若接收端已关闭,send 会返回错误。
  3. 接收数据rx.recv() 会阻塞等待数据;rx.try_recv() 则不会阻塞,可用于非阻塞检查。也可将 rx 当做迭代器使用,以便持续接收数据,直到通道被关闭。
  4. 所有权规则保障安全:发送端在 send 时会移动数据的所有权,避免了多线程中对同一数据的潜在不安全访问。
  5. 多发送端:通过克隆发送端(tx.clone()),多个线程可以各自发送数据到同一个接收端,从而实现复杂的多生产者单消费者架构。

Rust 的通道借助所有权系统,帮助我们轻松规避了许多并发陷阱。通过消息传递的思路,不再需要小心翼翼地管理锁和共享数据,编程思路也往往更加清晰简洁。在实际项目中,若需要多个线程之间相互通信,不妨考虑一下通道(channel)方案,也许能带来更加优雅和可靠的并发架构。


http://www.niftyadmin.cn/n/5869882.html

相关文章

PhotoLine绿色版 v25.00:全能型图像处理软件的深度解析

在图像处理领域,PhotoLine以其强大的功能和紧凑的体积,赢得了国内外众多用户的喜爱。本文将为大家全面解析PhotoLine绿色版 v25.00的各项功能,帮助大家更好地了解这款全能型的图像处理软件。 一、迷你体积,强大功能 PhotoLine被誉为迷你版的Photoshop,其体积虽小,但功能却…

WiFi IEEE 802.11协议精读:IEEE 802.11-2007,6,MAC service definition MAC服务定义

继续精读IEEE 802.11-2007 6,MAC service definition MAC服务定义 6.1 MAC服务概述 6.1.1 数据服务 此服务为对等逻辑链路控制(LLC)实体提供交换MAC服务数据单元(MSDU)的能力。为支持此服务,本地媒体访…

全国传统村落空间分布SHP数据深度解析与保护价值探讨

一、引言 传统村落,又称为古村落,是民国以前所建的村落,它们宛如一颗颗散落于中华大地上的璀璨明珠,蕴藏着丰富的历史信息和文化景观。 这些村落不仅是中国农耕文明留下的重要遗产,更是中华民族传统文化和精神的重要…

django model.object.filter 不等于多个值

关于Django中QuerySet.filter()的使用问题。首先,我会分别针对“不等于多个值”的代码开发问题和可能遇到的报错问题给出解答。 代码开发问题:QuerySet.filter()不等于多个值 在Django中,如果你想在查询中排除多个值,可以使用__i…

科技赋能!深圳市悠声科技有限公司荣获“GAS消费电子科创奖-技术进步奖”!

在2025年“GAS消费电子科创奖”评选中,深圳市悠声科技有限公司提交的“MEMS扬声器技术”,在技术创新性、设计创新性、工艺创新性、智能化创新性及原创性五大维度均获得评委的高度认可,荣获“技术进步奖”。 这一奖项不仅是对悠声科技在消费电…

Docker 与 Serverless(无服务器架构)

Serverless(无服务器架构) 是一种新的云计算架构,它通过让开发者专注于业务逻辑而无需管理服务器基础设施,来简化应用的开发和部署。Serverless 模型通常由云服务提供商管理基础设施的所有方面,而开发者只需提供代码和…

企业数字化过程中数据仓库与商业智能 BI的目标

当前环境下,各领域企业通过数字化相关的一切技术,以数据为基础、以用户为核心,创建一种新的,或对现有商业模式进行重塑就是数字化转型。这种数字化转型给企业带来的效果就像是一次重构,会对企业的业务流程、思维文化、…

脚本无法获取响应主体(原因:CORS Missing Allow Credentials)

背景: 前端的端口号8080,后端8000。需在前端向后端传一个参数,让后端访问数据库去检测此参数是否出现过。涉及跨域请求,一直有这个bug是404文件找不到。 在修改过程当中不小心删除了一段代码,出现了这个bug&#xff…