-
-
Notifications
You must be signed in to change notification settings - Fork 571
-
|
I'm trying to come up with a way to cancel/interrupt a (potentially) long-running CPU-bound computation. By default, I would run the computation with
This is what I came up with: https://github.com/durban/choam/blob/97e3b751e77daf0641869efb08bbcbb8a021270f/core/shared/src/test/scala/dev/tauri/choam/IOSpec.scala#L33-L50 Here is the important part: def stoppable[F[_], A](task: AtomicBoolean => F[A])(implicit F: Spawn[F], S: Sync[F]): F[A] = {
F.flatMap(S.delay { new AtomicBoolean(false) }) { stopSignal => val tsk: F[A] = task(stopSignal) F.uncancelable { poll => F.flatMap(F.start(tsk)) { fiber => F.onCancel( fa = poll(fiber.joinWithNever), fin = F.productR(S.delay { stopSignal.set(true) })( // so that cancel backpressures until // the task actually observes the signal // and stops whatever it is doing: F.void(fiber.joinWithNever) ) ) } } } } It can be used like this: stoppable { stop =>
@tailrec def go(n: Long): Long = { if (stop.get()) { println(s"Stopping at ${n}") n } else { go(n + 1) } } F.delay { go(0L) } } This seems to work fine, but I have the following questions:
|
Beta Was this translation helpful? Give feedback.
All reactions
This might be simpler :-)
Async[F] async { cb =>
Sync[F] defer {
val flag = new AtomicBoolean(false)
val runner = Sync[F].delay(task(flag)).attempt.flatMap(e => Sync[F].delay(cb(e)))
runner.start.as(Some(Sync[F].delay(flag.set(true))))
}
}
Replies: 2 comments 7 replies
-
Well, the idiomatic thing to do is to split it into multiple blocks, so that there is possibility for cancellation between them. F.delay { step1() } *> F.delay { step2() } *> F.delay { step3() } ...
But, there are bigger concerns here than cancellation.
Yes, there is a downside: your
From https://typelevel.org/cats-effect/docs/thread-model#thread-blocking This is why |
Beta Was this translation helpful? Give feedback.
All reactions
-
|
Yeah, sorry, I should've been more clear: the computation is usually not very long. (That is why I don't want to submit it to the blocking pool.) If everything goes right, the computation is quite fast. But, if something goes wrong (basically, if there is a bug) then it's something like an infinite loop. And in this case, I'd like to be able to stop it somehow from the outside. That is why I came up with this "checking a flag occasionally" thing. |
Beta Was this translation helpful? Give feedback.
All reactions
-
|
Is writing it in terms of smaller |
Beta Was this translation helpful? Give feedback.
All reactions
-
|
I'll think about that some more, but I don't think so. (The computation is somewhat complex, and not "in" |
Beta Was this translation helpful? Give feedback.
All reactions
-
|
Thanks, I'll take a look. (Although generally this task is not blocking... I realize it is somewhat strange... it is not blocking, unless there is a bug... in which case I want to be able to kill it.) |
Beta Was this translation helpful? Give feedback.
All reactions
-
|
This might be simpler :-) def stoppable[F[_]: Async, A](task: AtomicBoolean => F[A])(implicit S: Sync[F]): F[A] =
Async[F] async { cb => Sync[F] defer { val flag = new AtomicBoolean(false) val runner = Sync[F].delay(task(flag)).attempt.flatMap(e => Sync[F].delay(cb(e))) runner.start.as(Some(Sync[F].delay(flag.set(true)))) } } |
Beta Was this translation helpful? Give feedback.
All reactions
-
|
Thanks, that seems simpler. But isn't an (Also, this doesn't seem to wait for the cancellation to actually happen. I might need that, I'll have to think about it.) |
Beta Was this translation helpful? Give feedback.
All reactions
-
|
Okay, so These 2 laws seem to say something similar, although I'm not sure: https://github.com/typelevel/cats-effect/blob/047b1bd/laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala#L29-L37 So, cancellation-safety seems good. However, I need to wait for the task to actually stop, so this is how I modified your version: def stoppableSimpler[F[_], A](task: AtomicBoolean => F[A])(implicit F: Async[F]): F[A] = {
F.async { cb => F.defer { val flag = new AtomicBoolean(false) val runner = task(flag).attempt.flatMap { e => F.delay { cb(e) } } runner.start.map { fiber => Some(F.delay { flag.set(true) } *> fiber.joinWithNever.void) } } } } |
Beta Was this translation helpful? Give feedback.