赞
踩
构建 web server 的计划:
web server 中涉及到的两个主要协议是 超文本传输协议(Hypertext Transfer Protocol,HTTP)和 传输控制协议(Transmission Control Protocol,TCP)。这两者都是 请求 - 响应(request-response)协议,也就是说,有 客户端(client)来初始化请求,并有 服务端(server)监听请求并向客户端提供响应。请求与响应的内容由协议本身定义。
单线程服务器
use std::{ fmt::format, fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { // 在地址 127.0.0.1:7878 上监听传入的 TCP 流 // bind 函数返回 Result<T, E> ,这表明绑定可能会失败,非管理员用户只能监听大于 1023 的端口 let listenser = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listenser.incoming() { let stream = stream.unwrap(); println!("Connection established!"); handle_connection(stream); // 移交了stream的所有权给handle_connection函数 } } // stream绑定到此函数内时,被声明为了mut,这是合法的 fn handle_connection(mut stream: TcpStream) { // BufReader 实现了 std::io::BufRead trait // 创建缓冲读取器,将 stream 进行包装,允许我们从流中进行缓冲读取 let buf_reader = BufReader::new(&mut stream); // 调用next函数得到第一行的数据 // 第一个unwrap解包next返回的Option得到Result,第二个解包Result得到String let request_line = buf_reader.lines().next().unwrap().unwrap(); // 读取HTTP请求并打印出来 // 1.lines方法遇到换行符(newline)字节就切分数据流,即逐行读取流中的数据,返回一个 Result<String,std::io::Error> 的迭代器 // 2.通过 map 并 unwrap 每一个 Result,即可得到每一个 String,如果发生错误,程序会发生 panic // 3.浏览器通过连续发送两个换行符来代表一个 HTTP 请求的结束,所以为了从流中获取一个请求,我们获取行直到它们不为空。 // 使用 take_while() 方法来获取迭代器中的项,直到遇到空行为止 // 4.最后使用collect(),将这些请求行收集到一个 Vec 中 // let http_request: Vec<_> = buf_reader // .lines() // .map(|result| result.unwrap()) // .take_while(|line| !line.is_empty()) // .collect(); // println!("Request: {:#?}", http_request); // 验证请求并有选择的进行响应 整个字符串切片的写法 let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { // 请求/sleep资源时,程序沉睡5秒 thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length:{length}\r\n\r\n{contents}"); // write_all的参数是 &[u8] 的字节切片类型,使用 as_bytes 转换 stream.write_all(response.as_bytes()).unwrap(); }
单线程服务器在遇到慢请求时,其他请求都排在慢请求之后,这时需要使用多线程技术进行改进。
使用线程池可以改善吞吐量,并发处理连接。线程池中有固定数量的等待线程,当新进请求时,将请求发送到线程池中做处理。
线程池会维护一个接收请求的队列。每一个线程会从队列中取出一个请求,处理请求,接着向队列索取另一个请求。通过这种设计,则可以并发处理 N 个请求,其中 N 为线程数。如果每一个线程都在响应慢请求,之后的请求仍然会阻塞队列,不过相比之前增加了能处理的慢请求的数量。
在库文件里手写线程池,实现封装
文件名:src/lib.rs
use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, // 这里的sender 设计为 Option 也是为了方便移交所有权 } // 给类型取一个新的名称Job,存放用于向信道中发送的闭包, // 1.dyn FnOnce(): 这是一个 trait 对象,表示实现了 FnOnce() trait的闭包,它表示一个可以被调用一次的闭包。 //使用 dyn 关键字,我们将闭包包装在一个动态分发的 trait 对象中,这允许我们在运行时处理闭包,而不需要在编译时知道其具体类型。 // 2.实现 Send 特性:闭包可以安全地在多个线程间传递,它们不会产生数据竞争或其他线程安全问题。 // 3.'static': 这表示闭包的生命周期是 'static',意味着它可以在整个程序的执行期间存活 type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// 创建线程池。 /// /// 线程池中线程的数量。 /// /// # Panics /// /// `new` 函数在 size 为 0 时会 panic。 pub fn new(size: usize) ->ThreadPool { assert!(size > 0); // 创建一个信道并充当发送者 let (sender, receiver) = mpsc::channel(); // 不能直接把一个receiver传递给多个Worker,而为了实现在多个线程间共享所有权并允许线程修改其值 // 可以使用 Arc<Mutex<T>>,Arc 使得多个 worker 拥有接收端,而 Mutex 则确保一次只有一个 worker 能从接收端得到任务 let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); // 每个 Worker 将会充当接收者 } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) //在此方法中获得期望执行的闭包 where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } // 为 ThreadPool 实现 Drop Trait。 // 当线程池被丢弃时,应该 join 所有线程以确保它们执行完任务正常结束运行 impl Drop for ThreadPool { fn drop(&mut self) { // 调用 join 并不会关闭线程,因为它们一直 loop 来寻找任务,主线程会永远阻塞在等待第一个线程结束上。 // 丢弃 sender 会关闭信道,这表明不会有更多的消息被发送。 // 这时 worker 中的无限循环中的所有 recv 调用都会返回错误,我们就可以 drop(self.sender.take()); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); // 调用 join() 方法时,需要拥有线程的所有权,而不是只有对线程的引用,因此需要把所有权移动到里边 // 在 Option 上调用 take 方法: 将值从 Some 成员中移动出来而对 None 成员不做处理 if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { // 存储线程实例 id: usize, // JoinHandle有方法join // 线程设计为 Option<thread::JoinHandle<()>, 以便调用 take 方法将值从 Some 成员中移动出来 thread: Option<thread::JoinHandle<()>>, // 处理线程不需要返回值,因此JoinHandle使用单元类型 () } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // 传入线程id let thread = thread::spawn(move || loop { // 线程拿到锁后调用 recv 从信道中接收 Job // 调用 recv 会阻塞当前线程,所以如果还没有任务,其会等待直到有可用的任务 let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { // 当 recv 返回错误时显式退出循环 println!("Worker {id} disconnected; shutting down."); break; } } }); Worker { id, thread: Some(thread), } } }
文件名:src\main.rs
use std::{ fmt::format, fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; use hello::ThreadPool; fn main() { let listenser = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listenser.incoming().take(2) { let stream = stream.unwrap(); println!("Connection established!"); pool.execute(|| { handle_connection(stream); // 移交了stream的所有权给handle_connection函数 }); } println!("Shutting down."); } // stream绑定到此函数内时,被声明为了mut,这是合法的 fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { // 请求/sleep资源时,程序沉睡5秒 thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length:{length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。