Light Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

"Cooperative" cancellation #2671

Answered by djspiewak
durban asked this question in Q&A
"Cooperative" cancellation #2671
Dec 19, 2021 * 2 comments * 7 replies
Answered by djspiewak Return to top
Discussion options

durban
Dec 19, 2021
Collaborator

I'm trying to come up with a way to cancel/interrupt a (potentially) long-running CPU-bound computation. By default, I would run the computation with delay, like this: F.delay { longComputation() }. However, the computation could "occasionally" check for a flag (e.g., Thread.interrupted() or an AtomicBoolean), and stop whatever it's doing.

  • If I understand correctly, there is no built-in way to cancel an F.delay { ... } block.
  • Using F.interruptible { ... } and checking Thread.interrupted() seems to work fine, but (if I understand correctly) submits the computation to a separate threadpool.

This is what I came up with: https://github.com/durban/choam/blob/97e3b751e77daf0641869efb08bbcbb8a021270f/core/shared/src/test/scala/dev/tauri/choam/IOSpec.scala#L33-L50

Here is the important part:

def stoppable[F[_], A](task: AtomicBoolean => F[A])(implicit F: Spawn[F], S: Sync[F]): F[A] = {
F.flatMap(S.delay { new AtomicBoolean(false) }) { stopSignal =>
val tsk: F[A] = task(stopSignal)
F.uncancelable { poll =>
F.flatMap(F.start(tsk)) { fiber =>
F.onCancel(
fa = poll(fiber.joinWithNever),
fin = F.productR(S.delay { stopSignal.set(true) })(
// so that cancel backpressures until
// the task actually observes the signal
// and stops whatever it is doing:
F.void(fiber.joinWithNever)
)
)
}
}
}
}

It can be used like this:

stoppable { stop =>
@tailrec
def go(n: Long): Long = {
if (stop.get()) {
println(s"Stopping at ${n}")
n
} else {
go(n + 1)
}
}
F.delay { go(0L) }
}

This seems to work fine, but I have the following questions:

  • Is this correct/safe?
    • Obviously the computation have to be "trusted" to actually obey the cancellation request.
  • Is there a simpler way to do this?
  • Is this actually "better" than doing F.interruptible { ... }? Or does this have a downside?
    • (I guess this depends on the computation... maybe I should do some benchmarks...)
You must be logged in to vote

This might be simpler :-)

def stoppable[F[_]: Async, A](task: AtomicBoolean => F[A])(implicit S: Sync[F]): F[A] =
Async[F] async { cb =>
Sync[F] defer {
val flag = new AtomicBoolean(false)
val runner = Sync[F].delay(task(flag)).attempt.flatMap(e => Sync[F].delay(cb(e)))
runner.start.as(Some(Sync[F].delay(flag.set(true))))
}
}
View full answer

Replies: 2 comments 7 replies

Comment options

armanbilge
Dec 19, 2021
Maintainer Sponsor

If I understand correctly, there is no built-in way to cancel an F.delay { ... } block.

Well, the idiomatic thing to do is to split it into multiple blocks, so that there is possibility for cancellation between them.

F.delay { step1() } *> F.delay { step2() } *> F.delay { step3() } ...

But, there are bigger concerns here than cancellation.

Is this actually "better" than doing F.interruptible { ... }? Or does this have a downside?

Yes, there is a downside: your longComputation() is essentially a blocking operation so running it in F.delay it will take over one of the WorkStealingThreadPool's limited threads, absolutely no different than if you did blocking I/O and thus has all the same caveats:

running blocking code on our compute pool is very bad. If we're running on a node with 2 CPUs and we evaluate a blocking call like IO(Source.fromFile(path).getLines()) then for the duration of that operation our capacity to evaluate IO fibers is halved. Run two such operations at the same time and your application effectively stops until one of those blocking calls completes.

From https://typelevel.org/cats-effect/docs/thread-model#thread-blocking

This is why F.blocking and F.interruptible shift the work to a new threadpool, specifically to handle blocking operations without starving the primary compute pool.

You must be logged in to vote
5 replies
Comment options

durban Dec 19, 2021
Collaborator Author

Yeah, sorry, I should've been more clear: the computation is usually not very long. (That is why I don't want to submit it to the blocking pool.) If everything goes right, the computation is quite fast. But, if something goes wrong (basically, if there is a bug) then it's something like an infinite loop. And in this case, I'd like to be able to stop it somehow from the outside. That is why I came up with this "checking a flag occasionally" thing.

Comment options

armanbilge Dec 19, 2021
Maintainer Sponsor

Is writing it in terms of smaller F.delay steps not an option?

Comment options

durban Dec 20, 2021
Collaborator Author

I'll think about that some more, but I don't think so. (The computation is somewhat complex, and not "in" F[_] normally.)

Comment options

armanbilge Dec 24, 2021
Maintainer Sponsor

@durban Vasil just dropped #2687 that will make it much less expensive to run a blocking operation by running it directly on the compute pool and avoiding the round-trip to the blocking pool.

Comment options

durban Dec 24, 2021
Collaborator Author

Thanks, I'll take a look. (Although generally this task is not blocking... I realize it is somewhat strange... it is not blocking, unless there is a bug... in which case I want to be able to kill it.)

Comment options

djspiewak
Dec 20, 2021
Maintainer

This might be simpler :-)

def stoppable[F[_]: Async, A](task: AtomicBoolean => F[A])(implicit S: Sync[F]): F[A] =
Async[F] async { cb =>
Sync[F] defer {
val flag = new AtomicBoolean(false)
val runner = Sync[F].delay(task(flag)).attempt.flatMap(e => Sync[F].delay(cb(e)))
runner.start.as(Some(Sync[F].delay(flag.set(true))))
}
}
You must be logged in to vote
2 replies
Comment options

durban Dec 20, 2021
Collaborator Author

Thanks, that seems simpler. But isn't an uncancelable necessary somewhere? Could this get cancelled right after starting? Could it leave task running then?

(Also, this doesn't seem to wait for the cancellation to actually happen. I might need that, I'll have to think about it.)

Comment options

durban Dec 28, 2021
Collaborator Author

Okay, so uncancelable probably isn't necessary here, since async already contains one: https://github.com/typelevel/cats-effect/blob/047b1bd/kernel/shared/src/main/scala/cats/effect/kernel/Async.scala#L76.

These 2 laws seem to say something similar, although I'm not sure: https://github.com/typelevel/cats-effect/blob/047b1bd/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala#L29-L37

So, cancellation-safety seems good. However, I need to wait for the task to actually stop, so this is how I modified your version:

def stoppableSimpler[F[_], A](task: AtomicBoolean => F[A])(implicit F: Async[F]): F[A] = {
F.async { cb =>
F.defer {
val flag = new AtomicBoolean(false)
val runner = task(flag).attempt.flatMap { e =>
F.delay { cb(e) }
}
runner.start.map { fiber =>
Some(F.delay { flag.set(true) } *> fiber.joinWithNever.void)
}
}
}
}
Answer selected by durban
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
3 participants