赞
踩
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
这里解释一下,单线程并发处理请求,其实每一个请求都是一个future,如果一个future卡了,会去处理别的future,async原理可以去看我关于async的博客
use async_std::net::TcpListener; use async_std::net::TcpStream; use futures::stream::StreamExt; use std::fs; use async_std::prelude::*; use std::time::Duration; use async_std::task; #[async_std::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap(); // 创建 TCP 监听器,绑定到本地 7878 端口 listener .incoming() // 监听传入的连接 .for_each_concurrent(/* limit */ None, |tcpstream| async move {//stream 流异步并发处理 let tcpstream = tcpstream.unwrap(); // 获取传入连接 handle_connection(tcpstream).await; // 处理连接的异步任务 }) .await; // 等待所有连接处理完毕 } async fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 1024]; stream.read(&mut buffer).await.unwrap(); // 读取 TCP 流数据到缓冲区 let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; // 判断请求内容,确定响应状态行和文件名 let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") // 如果请求是 GET /,返回 200 OK 状态和 hello.html 文件 } else if buffer.starts_with(sleep) { task::sleep(Duration::from_secs(5)).await; // 如果请求是 GET /sleep,等待 5 秒钟 ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") // 然后返回 200 OK 状态和 hello.html 文件 } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") // 其他情况返回 404 NOT FOUND 状态和 404.html 文件 }; // 读取文件内容到字符串 let contents = fs::read_to_string(filename).unwrap(); let response = format!("{status_line}{contents}"); // 构建响应字符串 // 将响应字符串写入 TCP 流,并刷新 stream.write(response.as_bytes()).await.unwrap(); stream.flush().await.unwrap(); }
lib.rs
// src/lib.rs use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } }); Worker { id, thread: Some(thread), } } }
main.rs
// src/main.rs use hello_package::ThreadPool; use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; use std::thread; use std::time::Duration; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(4) {//take限制请求次数 let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 1024]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let response = format!( "{}\r\nContent-Length: {}\r\n\r\n{}", status_line, contents.len(), contents ); stream.write_all(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。