Rust实践:Creating a Chat Server with async Rust and Tokio
1. 创建支持一次通信的Echo Server - Handle One Client
1.1. 项目结构
1 | chat-tokio-stream |
1.2. 源文件内容
main.rs
内容:
1 | use tokio::{net::TcpListener, io::{AsyncReadExt, AsyncWriteExt}}; |
Cargo.toml
内容:
1 | [package] |
1.3. 测试
安装
telnet
对于macOS,使用brew
安装telnet
:1
brew install telnet
运行该项目
1
cargo run
将在本地(localhost)的
8080
端口接收TCP连接。使用
telnet
进行测试1
2
3
4
5# 连接本地的 localhost:8080
telnet localhost 8080
# 建立连接后,发送消息,如`hello!`,会看到本地server返回一个`hello!`
hello!测试结果:
可以看到,一次通信结束后,server shut down了。
2. 创建支持多次通信的Echo Server - Handle One Client
2.1. 源文件内容
main.rs
内容:
1 | use tokio::{net::TcpListener, io::{AsyncWriteExt, BufReader, AsyncBufReadExt}}; |
Cargo.toml
文件不变。
2.2. 测试
运行该项目
1
cargo run
将在本地(localhost)的
8080
端口接收TCP连接。使用
telnet
进行测试1
2
3
4
5
6
7
8# 连接本地的 localhost:8080
telnet localhost 8080
# 建立连接后,发送消息,如`hello!`,会看到本地server返回一个`hello!`
hello!
# 可以继续发送消息,本地server会返回相同内容,直到client断开连接
world
测试结果:
3. 创建支持多次通信的Echo Server - Handle Multiple Clients
3.1. 源文件内容
main.rs
内容:
1 | use tokio::{ |
Cargo.toml
文件不变。
3.2. 测试
运行该项目
1
cargo run
将在本地(localhost)的
8080
端口接收TCP连接。使用
telnet
进行测试1
2
3
4
5
6
7
8# 打开1个终端,连接本地的 localhost:8080
telnet localhost 8080
# 建立连接后,发送消息,如`hello!`,会看到本地server返回一个`hello!`
hello!
# 打开第2个终端,如上连接本地的 localhost:8080,发送消息
world可以看到两个clients都能收到server返回的消息。
测试结果:
4. 将Echo Server变成Chat Server
逻辑问题预警!
4.1. 源文件内容
main.rs
内容:
1 | use tokio::{ |
Cargo.toml
文件不变。
4.2. 测试
运行该项目
1
cargo run
将在本地(localhost)的
8080
端口接收TCP连接。使用
telnet
进行测试1
2
3
4
5
6
7
8
9
10
11# 打开2个终端,均连接本地的 localhost:8080
telnet localhost 8080
# 建立连接后,使用其中一个终端发送消息,如`how are you?`
how are you?
# 结果只有第1个终端出现`how are you?`,第2个终端没有出现`how are you?`的消息。
# 如果使用第2个终端发送`I'm great!`,可以看到第2个终端先显示`I'm great!`,
# 然后才出现`how are you?`的消息。
I'm great!
how are you?测试结果:
4.3. 分析
main.rs
:
1 | use tokio::{ |
5. 异步处理消息的Chat Server
5.1. 源文件内容
main.rs
内容:
1 | use tokio::{ |
5.2. 测试
运行该项目
1
cargo run
将在本地(localhost)的
8080
端口接收TCP连接。使用
telnet
进行测试1
2
3
4
5
6
7
8
9# 打开2个终端,均连接本地的 localhost:8080
telnet localhost 8080
# 建立连接后,使用其中一个终端发送消息,如`how are you?`
how are you?
# 两个终端都会接收到`how are you?`这个消息。
# 使用第2个终端发送`I'm great!`,可以看到两个终端都会显示`I'm great!`
I'm great!测试结果:
6. 添加feature:不向发送消息的Client发送他自己的消息
可以看到,上面的Chat Server在收到一个client发送的消息后,会向所有clients广播这个消息,包括消息的发送者。
这里,我们想要改进这一点:只向消息发送者之外的人广播这条消息,而消息发送者不会再次收到自己刚刚发的消息。
6.1. 源文件代码
main.rs
内容:
1 | use tokio::{ |
6.2. 测试
运行该项目
1
cargo run
将在本地(localhost)的
8080
端口接收TCP连接。使用
telnet
进行测试1
2
3
4
5
6
7
8
9# 打开2个终端,均连接本地的 localhost:8080
telnet localhost 8080
# 使用其中一个终端发送消息,如`how are you?`
how are you?
# 只有另一终端会接收到`how are you?`这个消息。
# 使用第2个终端发送`I'm great!`,可以看到只有第1个终端都会收到`I'm great!`
I'm great!测试结果:
7. tokio::spawn
vs. tokio::select!
tokio::spawn
and tokio::select!
are two different ways to do concurrency. How do we know when to use which?
tokio::select!
is very useful when you have things that need to operate on the same shared state, and you have a finite number of things.
For example, in this chat server, we have exactly two async tasks that need to run concurrently: reading a line from a network and receiving on this internal broadcast channel in memory.
These are the only two things that are ever going to need to run concurrently within the scope of this one task.In addition to finite number of things, we also have some shared state. We did a split on the socket (
socket.split()
). If we look at the type that we get back fromsplit
:1
pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)
we have a
ReadHalf<'a>
and aWriteHalf<'a>
. You may notice that both of them have lifetime parameters'a
that’s matched up with a mutable borrow of theTcpStream
, which is&'a mut self
in the above signature. If we go to their definitions, we’ll find thatReadHalf
andWriteHalf
are implemented just as references to theTcpStream
:1
2pub struct ReadHalf<'a>(&'a TcpStream);
pub struct WriteHalf<'a>(&'a TcpStream);So if we try to use
spawn
on these two things as indenpendent tokio tasks, we would run into a problem, because we’ll be trying to split a reference across multiple tasks, which is not allowed. Everytime you calltokio::spawn
, everything you pass to it has to bestatic
:1
2
3
4pub fn spawn<T>(task: T) -> JoinHandler<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,If we change the code to use
tokio::spawn
instead oftokio::select!
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43loop {
let (mut socket, addr) = listener.accept().await.unwrap();
let tx = tx.clone();
let mut rx = tx.subscribe();
tokio::spawn(async move {
let (reader, mut writer) = socket.split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
// `tokio::spawn` is used here!
tokio::spawn(async move {
reader.read_line(&mut line).await.unwrap();
});
tx.send(("".to_string(), addr)).unwrap();
loop {
tokio::select! {
// result = reader.read_line(&mut line) => {
// if result.unwrap() == 0 {
// break;
// }
// tx.send((line.clone(), addr)).unwrap();
// line.clear();
// }
result = rx.recv() => {
let (msg, other_addr) = result.unwrap();
if addr != other_addr {
writer.write_all(msg.as_bytes()).await.unwrap();
}
}
}
}
});
}We’ll now get the error of “socket does not live long enough”. That’s because we’re calling the function
reader.read_line
, which needs the reference to the socket to be a static reference. But now it’s not a static reference. So because of the waysplit
works, we do have some shared state between the two branches (reader
andwriter
both point to the same underlying piece of memory).It’s very common in writing programs with tokio that a
select
statement has five or six branches on it, and they are all just gathering messages from different channels, and then altering some shared state like error counters or something like the above.Besides, in
select
, branches can operate at the same time as each other (like theread_line
andrecv
shown in previous sections). However, as soon as it jumps into one of the blocks, that’ll be the only thing that’s running (it will suspend other branches).Another benefit you can get from
select
is that because it all happens on the same task, you can easily share references among all the different branches ofselect
. It just makes memory management a lot easier. You don’t need to worry about making everything owned, or making everything static, or sync and all these stuff.