Stopping a Rust Worker

This is a small post about a specific pattern for cancellation in the Rust programming language. The pattern is simple and elegant, but it's rather difficult to come up with it by yourself.

Introducing a worker

To be able to stop a worker, we need to have one in the first place! So, let's implement a model program.

The task is to read the output line-by-line, sending these lines to another thread for processing (echoing the line back, with ). My solution looks like this:


use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;

fn main() {
let worker = spawn_worker();

let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
worker.send(Msg::Echo(line))
.unwrap();
}

println!("Bye!");
}

enum Msg {
Echo(String),
}

fn spawn_worker() -> Sender {
let (tx, rx) = channel();
thread::spawn(move || {
loop {
let msg = rx.recv().unwrap();
match msg {
Msg::Echo(msg) => println!("{} ", msg),
}
}
});
tx
}

The program seems to work:


$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello
world
world
Bye!

Stopping the worker, the obvious way

Now that we have a worker, let's add a new requirement.

When the user types stop, the worker (but not the program itself) should be halted.

How can we do this? The most obvious way is to add a new variant, Stop, to the Msg enum, and break out of the worker's loop:


use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;

fn main() {
let worker = spawn_worker();

let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
let msg = if line == "stop" {
Msg::Stop
} else {
Msg::Echo(line)
};

worker.send(msg)
.unwrap();
}

println!("Bye!");
}

enum Msg {
Echo(String),
Stop,
}

fn spawn_worker() -> Sender {
let (tx, rx) = channel();
thread::spawn(move || {
loop {
let msg = rx.recv().unwrap();
match msg {
Msg::Echo(msg) => println!("{} ", msg),
Msg::Stop => break,
}
}
println!("The worker has stopped!");
});
tx
}

This works, but only partially:


$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello
stop
The worker has stopped!
world
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: "SendError(..)"', /checkout/src/libcore/result.rs:916:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.

We can add more code to fix the panic, but let's stop for a moment and try to invent a more elegant way to stop the worker. The answer will be below this beautiful Ukiyo-e print :-)

Dropping the microphone

The answer is: the cleanest way to cancel something in Rust is to drop it. For our task, we can stop the worker by dropping the Sender:


use std::io::BufRead;
use std::sync::mpsc::{Sender, channel};
use std::thread;

fn main() {
let mut worker = Some(spawn_worker());

let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
if line == "stop" {
drop(worker.take());
continue
};

if let Some(ref worker) = worker {
worker.send(Msg::Echo(line)).unwrap();
} else {
println!("The worker has been stopped!");
};
}

println!("Bye!");
}

enum Msg {
Echo(String),
}

fn spawn_worker() -> Sender {
let (tx, rx) = channel();
thread::spawn(move || {
while let Ok(msg) = rx.recv() {
match msg {
Msg::Echo(msg) => println!("{} ", msg),
}
}
println!("The worker has stopped!");
});
tx
}

Note the interesting parts of the solution:

  • no need to invent an additional message type,
  • the Sender is stored inside an Option, so that we can drop it with the .take method,
  • the Option forces us to check if the worker is alive before sending a message.

More generally, previously the worker had two paths for termination: a normal termination via the Stop message and an abnormal termination after a panic in recv (which might happen if the parent thread panics and drops the Sender). Now there is a single code path for both cases. That means we can be surer that if something somewhere dies with a panic then the shutdown will proceed in an orderly fashion, it is not a special case anymore.

The only thing left to make this ultimately neat is to replace a hand-written while let with a for loop:


for msg in rx {
match msg {
Msg::Echo(msg) => println!("{} ", msg),
}
}

Am I awaited?

It's interesting to see that the same pattern applies to the async version of the solution as well.

Async baseline:


extern crate futures; // [dependencies] futures = "0.1"

use std::io::BufRead;
use std::thread;

use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};

fn main() {
let mut worker = spawn_worker();

let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
worker = worker.send(Msg::Echo(line)).wait().unwrap();
}

println!("Bye!");
}

enum Msg {
Echo(String),
}

fn spawn_worker() -> Sender {
let (tx, rx) = channel(1);
thread::spawn(move || {
rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => println!("{} ", msg),
}
Ok(())
}).wait().unwrap()
});
tx
}

Async with a termination message:


extern crate futures; // [dependencies] futures = "0.1"

use std::io::BufRead;
use std::thread;

use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};

fn main() {
let mut worker = spawn_worker();

let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
let msg = if line == "stop" {
Msg::Stop
} else {
Msg::Echo(line)
};
worker = worker.send(msg).wait().unwrap();
}

println!("Bye!");
}

enum Msg {
Echo(String),
Stop,
}

fn spawn_worker() -> Sender {
let (tx, rx) = channel(1);
thread::spawn(move || {
let _ = rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => {
println!("{} ", msg);
Ok(())
},
Msg::Stop => Err(()),
}
}).then(|result| {
println!("The worker has stopped!");
result
}).wait();
});
tx
}

$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello
stop
The worker has stopped!
world
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("...")', /checkout/src/libcore/result.rs:916:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.

Async with drop:


extern crate futures; // [dependencies] futures = "0.1"

use std::io::BufRead;
use std::thread;

use futures::sync::mpsc::{Sender, channel};
use futures::{Future, Stream, Sink};

fn main() {
let mut worker = Some(spawn_worker());

let stdin = ::std::io::stdin();
for line in stdin.lock().lines() {
let line = line.unwrap();
if line == "stop" {
drop(worker.take());
continue;
};

if let Some(w) = worker {
worker = Some(w.send(Msg::Echo(line)).wait().unwrap())
} else {
println!("The worker has been stopped!");
}
}

println!("Bye!");
}

enum Msg {
Echo(String),
}

fn spawn_worker() -> Sender {
let (tx, rx) = channel(1);
thread::spawn(move || {
rx.for_each(|msg| {
match msg {
Msg::Echo(msg) => println!("{} ", msg),
}
Ok(())
}).map(|()| {
println!("The worker has stopped!");
}).wait().unwrap();
});
tx
}

$ cargo r
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/worker`
hello
hello
stop
The worker has stopped!
world
The worker has been stopped!
Bye!

Conclusion

So, yeah, this all was written just to say "in Rust, cancellation is drop" :-)

Discussion on /r/rust.