Stopping a Rust Worker
This is a small post about a specific pattern for cancellation in the Rust programming language. The pattern is simple and elegant, but it's rather difficult to come up with it by yourself.
Introducing a worker
To be able to stop a worker, we need to have one in the first place! So, let's implement a model program.
The task is to read the output line-by-line, sending these lines to another thread for processing (echoing the line back, with ). My solution looks like this:
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;
fn main() {
let worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
worker.send(Msg::Echo(line))
.unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender {
let (tx, rx) = channel();
thread::spawn(move || {
loop {
let msg = rx.recv().unwrap();
match msg {
Msg::Echo(msg) => println!("{} ", msg),
}
}
});
tx
}
The program seems to work:
$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello
world
world
Bye!
Stopping the worker, the obvious way
Now that we have a worker, let's add a new requirement.
When the user types stop, the worker (but not the
program itself) should be halted.
How can we do this? The most obvious way is to add a new variant,
Stop, to the Msg
enum, and break out of the worker's loop:
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;
fn main() {
let worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
let msg = if line == "stop" {
Msg::Stop
} else {
Msg::Echo(line)
};
worker.send(msg)
.unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
Stop,
}
fn spawn_worker() -> Sender {
let (tx, rx) = channel();
thread::spawn(move || {
loop {
let msg = rx.recv().unwrap();
match msg {
Msg::Echo(msg) => println!("{} ", msg),
Msg::Stop => break,
}
}
println!("The worker has stopped!");
});
tx
}
This works, but only partially:
$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello
stop
The worker has stopped!
world
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /checkout/src/libcore/result.rs:916:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.
We can add more code to fix the panic, but let's stop for a moment and try to invent a more elegant way to stop the worker. The answer will be below this beautiful Ukiyo-e print :-)
Dropping the microphone
The answer is: the cleanest way to cancel something in Rust is to
drop it. For our task, we can stop the worker by dropping the Sender:
use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;
fn main() {
let mut worker = Some(spawn_worker());
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
if line == "stop" {
drop(worker.take());
continue
};
if let Some(ref worker) = worker {
worker.send(Msg::Echo(line)).unwrap();
} else {
println!("The worker has been stopped!");
};
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender {
let (tx, rx) = channel();
thread::spawn(move || {
while let Ok(msg) = rx.recv() {
match msg {
Msg::Echo(msg) => println!("{} ", msg),
}
}
println!("The worker has stopped!");
});
tx
}
Note the interesting parts of the solution:
- no need to invent an additional message type,
-
the
Senderis stored inside anOption, so that we can drop it with the.takemethod, -
the
Optionforces us to check if the worker is alive before sending a message.
More generally, previously the worker had two paths for termination:
a normal termination via the Stop message and an
abnormal termination after a panic in recv (which might
happen if the parent thread panics and drops the Sender). Now there is a single code path for both cases.
That means we can be surer that if something somewhere dies with a
panic then the shutdown will proceed in an orderly fashion, it is
not a special case anymore.
The only thing left to make this ultimately neat is to replace a
hand-written while let
with a for loop:
for msg in rx {
match msg {
Msg::Echo(msg) => println!("{} ", msg),
}
}
Am I awaited?
It's interesting to see that the same pattern applies to the async version of the solution as well.
Async baseline:
extern crate futures; // [dependencies] futures = "0.1"
use std::io::BufRead;
use std::thread;
use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};
fn main() {
let mut worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
worker = worker.send(Msg::Echo(line)).wait().unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender {
let (tx, rx) = channel(1);
thread::spawn(move || {
rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => println!("{} ", msg),
}
Ok(())
}).wait().unwrap()
});
tx
}
Async with a termination message:
extern crate futures; // [dependencies] futures = "0.1"
use std::io::BufRead;
use std::thread;
use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};
fn main() {
let mut worker = spawn_worker();
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
let msg = if line == "stop" {
Msg::Stop
} else {
Msg::Echo(line)
};
worker = worker.send(msg).wait().unwrap();
}
println!("Bye!");
}
enum Msg {
Echo(String),
Stop,
}
fn spawn_worker() -> Sender {
let (tx, rx) = channel(1);
thread::spawn(move || {
let _ = rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => {
println!("{} ", msg);
Ok(())
},
Msg::Stop => Err(()),
}
}).then(|result| {
println!("The worker has stopped!");
result
}).wait();
});
tx
}
$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello
stop
The worker has stopped!
world
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("...")', /checkout/src/libcore/result.rs:916:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.
Async with drop:
extern crate futures; // [dependencies] futures = "0.1"
use std::io::BufRead;
use std::thread;
use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};
fn main() {
let mut worker = Some(spawn_worker());
let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
if line == "stop" {
drop(worker.take());
continue;
};
if let Some(w) = worker {
worker = Some(w.send(Msg::Echo(line)).wait().unwrap())
} else {
println!("The worker has been stopped!");
}
}
println!("Bye!");
}
enum Msg {
Echo(String),
}
fn spawn_worker() -> Sender {
let (tx, rx) = channel(1);
thread::spawn(move || {
rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => println!("{} ", msg),
}
Ok(())
}).map(|()| {
println!("The worker has stopped!");
}).wait().unwrap();
});
tx
}
$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello
stop
The worker has stopped!
world
The worker has been stopped!
Bye!
Conclusion
So, yeah, this all was written just to say "in Rust, cancellation is
drop" :-)
Discussion on /r/rust.