From 9ca5eec0ca7f3bf11afbd97a8a14c8c7b315067c Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Thu, 10 Jul 2025 14:43:36 -0400 Subject: [PATCH 01/17] Added benchmarks to reproduce issue --- .../effect/benchmarks/ParallelBenchmark.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/benchmarks/src/main/scala/cats/effect/benchmarks/ParallelBenchmark.scala b/benchmarks/src/main/scala/cats/effect/benchmarks/ParallelBenchmark.scala index 0e46ee7fdf..5d5beefe37 100644 --- a/benchmarks/src/main/scala/cats/effect/benchmarks/ParallelBenchmark.scala +++ b/benchmarks/src/main/scala/cats/effect/benchmarks/ParallelBenchmark.scala @@ -19,10 +19,13 @@ package cats.effect.benchmarks import cats.effect.IO import cats.effect.unsafe.implicits.global import cats.implicits.{catsSyntaxParallelTraverse1, toTraverseOps} +import cats.effect.syntax.all._ import org.openjdk.jmh.annotations._ import org.openjdk.jmh.infra.Blackhole +import scala.concurrent.duration._ + import java.util.concurrent.TimeUnit /** @@ -55,6 +58,24 @@ class ParallelBenchmark { def parTraverse(): Unit = 1.to(size).toList.parTraverse(_ => IO(Blackhole.consumeCPU(cpuTokens))).void.unsafeRunSync() + @Benchmark + def parTraverseN(): Unit = + 1.to(size) + .toList + .parTraverseN(size / 100)(_ => IO(Blackhole.consumeCPU(cpuTokens))) + .void + .unsafeRunSync() + + @Benchmark + def parTraverseNCancel(): Unit = { + val e = new RuntimeException + val test = 1.to(size * 100).toList.parTraverseN(size / 100) { _ => + IO.sleep(100.millis) *> IO.raiseError(e) + } + + test.attempt.void.unsafeRunSync() + } + @Benchmark def traverse(): Unit = 1.to(size).toList.traverse(_ => IO(Blackhole.consumeCPU(cpuTokens))).void.unsafeRunSync() From d609d3eddd609f3da6f865fc91d9cff0441f498b Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Thu, 10 Jul 2025 16:40:16 -0400 Subject: [PATCH 02/17] Reimplemented parTraverseN to be much more efficient and less pathological --- .../cats/effect/kernel/GenConcurrent.scala | 41 ++++++++++++++++++- .../cats/effect/kernel/MiniSemaphore.scala | 5 +-- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala index 024f640197..563a34f6a4 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala @@ -139,7 +139,27 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { implicit val F: GenConcurrent[F, E] = this - MiniSemaphore[F](n).flatMap { sem => ta.parTraverse { a => sem.withPermit(f(a)) } } + // TODO we need to write a test for error cancelation + F.ref[Set[Fiber[F, ?, ?]]](Set()) flatMap { supervision => + MiniSemaphore[F](n) flatMap { sem => + val results = ta traverse { a => + F.uncancelable { _ => + sem.acquire >> f(a).guarantee(sem.release).start map { fiber => + supervision.update(_ + fiber) *> + fiber.joinWithNever + .onCancel(fiber.cancel) + .guarantee(supervision.update(_ - fiber)) + } + } + } + + results.flatMap(_.sequence) guaranteeCase { + case Outcome.Succeeded(_) => F.unit + // has to be done in parallel to avoid head of line issues + case _ => supervision.get.flatMap(_.toList.parTraverse_(_.cancel)) + } + } + } } /** @@ -152,7 +172,24 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { implicit val F: GenConcurrent[F, E] = this - MiniSemaphore[F](n).flatMap { sem => ta.parTraverse_ { a => sem.withPermit(f(a)) } } + // TODO we need to write a test for error cancelation + F.ref[List[Fiber[F, ?, ?]]](Nil) flatMap { supervision => + MiniSemaphore[F](n) flatMap { sem => + // TODO this seems promising. we just need to sequence the errors/self-cancelation later + val startAll = ta traverse_ { a => + F.uncancelable { _ => + sem.acquire >> f(a).guarantee(sem.release).start flatMap { fiber => + // supervision is handled very differently here: we never remove from the set + supervision.update(fiber :: _) + } + } + } + + // we block until it's all done by acquiring all the permits + startAll.onCancel(supervision.get.flatMap(_.parTraverse_(_.cancel))) *> + sem.acquire.replicateA_(n) + } + } } override def racePair[A, B](fa: F[A], fb: F[B]) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/MiniSemaphore.scala b/kernel/shared/src/main/scala/cats/effect/kernel/MiniSemaphore.scala index 41368726ab..2eaa00efed 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/MiniSemaphore.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/MiniSemaphore.scala @@ -27,10 +27,9 @@ import scala.collection.immutable.{Queue => ScalaQueue} * A cut-down version of semaphore used to implement parTraverseN */ private[kernel] abstract class MiniSemaphore[F[_]] extends Serializable { + def acquire: F[Unit] + def release: F[Unit] - /** - * Sequence an action while holding a permit - */ def withPermit[A](fa: F[A]): F[A] } From 9f0b3286a1b66f90d83ec300d6e4b41080f9746f Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Thu, 10 Jul 2025 17:21:34 -0400 Subject: [PATCH 03/17] Fixed surfacing of inner errors/cancelation in parTraverseN_ --- .../cats/effect/kernel/GenConcurrent.scala | 53 ++++++++++++++----- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala index 563a34f6a4..7874574869 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala @@ -146,7 +146,8 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { F.uncancelable { _ => sem.acquire >> f(a).guarantee(sem.release).start map { fiber => supervision.update(_ + fiber) *> - fiber.joinWithNever + fiber + .joinWithNever .onCancel(fiber.cancel) .guarantee(supervision.update(_ - fiber)) } @@ -173,21 +174,47 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { implicit val F: GenConcurrent[F, E] = this // TODO we need to write a test for error cancelation - F.ref[List[Fiber[F, ?, ?]]](Nil) flatMap { supervision => - MiniSemaphore[F](n) flatMap { sem => - // TODO this seems promising. we just need to sequence the errors/self-cancelation later - val startAll = ta traverse_ { a => - F.uncancelable { _ => - sem.acquire >> f(a).guarantee(sem.release).start flatMap { fiber => - // supervision is handled very differently here: we never remove from the set - supervision.update(fiber :: _) + F.deferred[Option[E]] flatMap { preempt => + F.ref[List[Fiber[F, ?, ?]]](Nil) flatMap { supervision => + MiniSemaphore[F](n) flatMap { sem => + val startAll = ta traverse_ { a => + // first check to see if any of the effects have errored out + // don't bother starting new things if that happens + preempt.tryGet flatMap { + case Some(_) => + F.unit // allow the error to be resurfaced later + + case None => + F.uncancelable { _ => + // if the effect produces an error, race to kill all the rest + val wrapped = f(a) guaranteeCase { oc => + sem.release *> oc.fold( + preempt.complete(None).void, + e => preempt.complete(Some(e)).void, + _ => F.unit) + } + + sem.acquire >> wrapped.start flatMap { fiber => + // supervision is handled very differently here: we never remove from the set + supervision.update(fiber :: _) + } + } } } - } - // we block until it's all done by acquiring all the permits - startAll.onCancel(supervision.get.flatMap(_.parTraverse_(_.cancel))) *> - sem.acquire.replicateA_(n) + val cancelAll = supervision.get.flatMap(_.parTraverse_(_.cancel)) + + startAll.onCancel(cancelAll) *> + // we block until it's all done by acquiring all the permits + F.race(preempt.get *> cancelAll, sem.acquire.replicateA_(n)) *> + // if we hit an error or self-cancelation in any effect, resurface it here + // note that we can't lose errors here because of the permits: we know the fibers are done + preempt.tryGet flatMap { + case Some(Some(e)) => F.raiseError(e) + case Some(None) => F.canceled + case None => F.unit + } + } } } } From 607ab9af8ea05cd6f63f7c2c060f04a96880b6fd Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Fri, 11 Jul 2025 17:35:39 -0400 Subject: [PATCH 04/17] Swap to deferred to avoid orphaned errors --- .../cats/effect/kernel/GenConcurrent.scala | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala index 7874574869..040655d1ab 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala @@ -144,12 +144,19 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { MiniSemaphore[F](n) flatMap { sem => val results = ta traverse { a => F.uncancelable { _ => - sem.acquire >> f(a).guarantee(sem.release).start map { fiber => - supervision.update(_ + fiber) *> - fiber - .joinWithNever - .onCancel(fiber.cancel) - .guarantee(supervision.update(_ - fiber)) + F.deferred[Outcome[F, E, B]] flatMap { result => + sem.acquire >> f(a) + .guaranteeCase(oc => result.complete(oc) *> sem.release) + .void + .voidError + .start map { fiber => + supervision.update(_ + fiber) *> + result + .get + .flatMap(_.embedNever) + .onCancel(fiber.cancel) + .guarantee(supervision.update(_ - fiber)) + } } } } From 569ce543b1ae4186cbcf4a64c8991dcd806c58ec Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sat, 12 Jul 2025 11:16:05 -0400 Subject: [PATCH 05/17] Ported over @durban's tests and fixed --- .../cats/effect/kernel/GenConcurrent.scala | 20 ++- .../src/test/scala/cats/effect/IOSpec.scala | 147 ++++++++++++++++++ 2 files changed, 159 insertions(+), 8 deletions(-) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala index 040655d1ab..a65eed3f1b 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala @@ -139,23 +139,25 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { implicit val F: GenConcurrent[F, E] = this - // TODO we need to write a test for error cancelation F.ref[Set[Fiber[F, ?, ?]]](Set()) flatMap { supervision => MiniSemaphore[F](n) flatMap { sem => val results = ta traverse { a => - F.uncancelable { _ => + F.uncancelable { poll => F.deferred[Outcome[F, E, B]] flatMap { result => - sem.acquire >> f(a) + val action = poll(sem.acquire) >> f(a) .guaranteeCase(oc => result.complete(oc) *> sem.release) .void .voidError - .start map { fiber => - supervision.update(_ + fiber) *> + .start + + action flatMap { fiber => + supervision.update(_ + fiber) map { _ => result .get - .flatMap(_.embedNever) + .flatMap(_.embed(F.canceled *> F.never)) .onCancel(fiber.cancel) .guarantee(supervision.update(_ - fiber)) + } } } } @@ -192,7 +194,7 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { F.unit // allow the error to be resurfaced later case None => - F.uncancelable { _ => + F.uncancelable { poll => // if the effect produces an error, race to kill all the rest val wrapped = f(a) guaranteeCase { oc => sem.release *> oc.fold( @@ -201,7 +203,9 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { _ => F.unit) } - sem.acquire >> wrapped.start flatMap { fiber => + val suppressed = wrapped.void.voidError + + poll(sem.acquire) >> suppressed.start flatMap { fiber => // supervision is handled very differently here: we never remove from the set supervision.update(fiber :: _) } diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index b731aa4575..de0645a08d 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -1614,6 +1614,153 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { p must completeAs(true) } + "run finalizers when canceled" in ticked { implicit ticker => + val p = for { + r <- IO.ref(0) + + /* + * The exact series of steps here is: + * + * List(IO.never.onCancel, IO.unit, IO.never.onCancel) + * + * This is significant because we're limiting the parallelism to + * 2, meaning that we will hit a wall after IO.unit. HOWEVER, + * IO.unit completes immediately, so this test not only checks + * cancelation, it also tests that we move onto the third item + * after the second one completes even while the first is blocked. + * In other words, it's testing both cancelation and head of line + * behavior. + */ + f <- List(1, 2, 3) + .parTraverseN(2) { i => + if (i == 2) IO.unit + else IO.never.onCancel(r.update(_ + 1)) + } + .start + + _ <- IO.sleep(100.millis) + _ <- f.cancel + c <- r.get + _ <- IO { c mustEqual 2 } + } yield true + + p must completeAs(true) + } + + "propagate self-cancellation" in ticked { implicit ticker => + List(1, 2, 3, 4) + .parTraverseN(2) { (n: Int) => + if (n == 3) IO.canceled *> IO.never + else IO.pure(n) + } + .void must selfCancel + } + + "run finalizers when a task self-cancels" in ticked { implicit ticker => + val p = for { + r <- IO.ref(0) + fib <- List(1, 2, 3, 4) + .parTraverseN(2) { (n: Int) => + if (n == 3) IO.canceled *> IO.never + else IO.pure(n) + } + .onCancel(r.update(_ + 1)) + .void + .start + _ <- IO.sleep(100.millis) + c <- r.get + _ <- IO { c mustEqual 1 } + oc <- fib.join + } yield oc.isCanceled + + p must completeAs(true) + } + + "not run more than `n` tasks at a time" in real { + def task(counter: Ref[IO, Int], maximum: Ref[IO, Int]): IO[Unit] = { + val acq = counter.updateAndGet(_ + 1).flatMap { count => + maximum.update { max => if (count > max) count else max } + } + IO.asyncForIO.bracket(acq) { _ => IO.sleep(100.millis) }(_ => counter.update(_ - 1)) + } + + for { + maximum <- Ref.of[IO, Int](0) + counter <- Ref.of[IO, Int](0) + nCpu <- IO { Runtime.getRuntime().availableProcessors() } + n = java.lang.Math.max(nCpu, 2) + size = 4 * n + res <- (1 to size).toList.parTraverseN(n) { _ => task(counter, maximum) } + _ <- IO { res.size mustEqual size } + count <- counter.get + _ <- IO { count mustEqual 0 } + max <- maximum.get + _ <- IO { max must beLessThanOrEqualTo(n) } + } yield ok + } + + "run actually in parallel" in real { + val n = 4 + (1 to 2 * n) + .toList + .map { i => IO.sleep(1.second).as(i) } + .parSequenceN(n) + .timeout(3.seconds) + .flatMap { res => IO { res mustEqual (1 to 2 * n).toList } } + } + + "work for empty traverse" in ticked { implicit ticker => + List.empty[Int].parTraverseN(4) { _ => IO.never[String] } must completeAs( + List.empty[String]) + } + + "work for non-empty traverse (ticked)" in ticked { implicit ticker => + List(1).parTraverseN(4) { i => IO.pure(i.toString) } must completeAs(List("1")) + List(1, 2).parTraverseN(3) { i => IO.pure(i.toString) } must completeAs(List("1", "2")) + List(1, 2, 3).parTraverseN(2) { i => IO.pure(i.toString) } must completeAs( + List("1", "2", "3")) + List(1, 2, 3, 4).parTraverseN(1) { i => IO.pure(i.toString) } must completeAs( + List("1", "2", "3", "4")) + } + + "work for non-empty traverse (real)" in real { + for { + _ <- List(1).parTraverseN(4)(i => IO.pure(i.toString)).flatMap { r => + IO(r mustEqual List("1")) + } + _ <- List(1, 2).parTraverseN(3)(i => IO.pure(i.toString)).flatMap { r => + IO(r mustEqual List("1", "2")) + } + _ <- List(1, 2, 3).parTraverseN(2)(i => IO.pure(i.toString)).flatMap { r => + IO(r mustEqual List("1", "2", "3")) + } + _ <- List(1, 2, 3, 4).parTraverseN(1)(i => IO.pure(i.toString)).flatMap { r => + IO(r mustEqual List("1", "2", "3", "4")) + } + _ <- (1 to 10000).toList.parTraverseN(2)(i => IO.pure(i.toString)).flatMap { r => + IO(r mustEqual (1 to 10000).map(_.toString).toList) + } + } yield ok + } + + "be null-safe" in real { + for { + r1 <- List[String]("a", "b", null, "d", null).parTraverseN(2) { + case "a" => IO.pure(null) + case "b" => IO.pure("x") + case "d" => IO.pure(null) + case null => IO.pure("z") + } + _ <- IO { r1 mustEqual List(null, "x", "z", null, "z") } + r2 <- List(1, 2, 3) + .parTraverseN(2) { i => + if (i == 2) null + else IO.pure(i) + } + .attempt + _ <- IO { r2 must beLike { case Left(e) => e must haveClass[NullPointerException] } } + } yield ok + } } "parTraverseN_" should { From 0109cc3193ccf94b63e58bf5c03e89eab0a6e3a8 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Tue, 22 Jul 2025 15:57:09 -0500 Subject: [PATCH 06/17] Added early abort when stop case is encountered --- .../cats/effect/kernel/GenConcurrent.scala | 63 ++++++++++++------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala index a65eed3f1b..7376bf6eda 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala @@ -139,34 +139,49 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { implicit val F: GenConcurrent[F, E] = this - F.ref[Set[Fiber[F, ?, ?]]](Set()) flatMap { supervision => - MiniSemaphore[F](n) flatMap { sem => - val results = ta traverse { a => - F.uncancelable { poll => - F.deferred[Outcome[F, E, B]] flatMap { result => - val action = poll(sem.acquire) >> f(a) - .guaranteeCase(oc => result.complete(oc) *> sem.release) - .void - .voidError - .start - - action flatMap { fiber => - supervision.update(_ + fiber) map { _ => - result - .get - .flatMap(_.embed(F.canceled *> F.never)) - .onCancel(fiber.cancel) - .guarantee(supervision.update(_ - fiber)) + F.deferred[Option[E]] flatMap { preempt => + F.ref[Set[Fiber[F, ?, ?]]](Set()) flatMap { supervision => + MiniSemaphore[F](n) flatMap { sem => + val results = ta traverse { a => + preempt.tryGet flatMap { + case Some(_) => + // it's okay to produce never here because the early abort preceeds us + // this effect won't get sequenced, so it can be anything really + F.pure(F.never[B]) + + case None => + F.uncancelable { poll => + F.deferred[Outcome[F, E, B]] flatMap { result => + val action = poll(sem.acquire) >> f(a) + .guaranteeCase { oc => + result.complete(oc) *> oc.fold( + preempt.complete(None).void, + e => preempt.complete(Some(e)).void, + _ => F.unit) *> sem.release + } + .void + .voidError + .start + + action flatMap { fiber => + supervision.update(_ + fiber) map { _ => + result + .get + .flatMap(_.embed(F.canceled *> F.never)) + .onCancel(fiber.cancel) + .guarantee(supervision.update(_ - fiber)) + } + } + } } - } } } - } - results.flatMap(_.sequence) guaranteeCase { - case Outcome.Succeeded(_) => F.unit - // has to be done in parallel to avoid head of line issues - case _ => supervision.get.flatMap(_.toList.parTraverse_(_.cancel)) + results.flatMap(_.sequence) guaranteeCase { + case Outcome.Succeeded(_) => F.unit + // has to be done in parallel to avoid head of line issues + case _ => supervision.get.flatMap(_.toList.parTraverse_(_.cancel)) + } } } } From a3f8fe8d43e239314fdad067232b8cf4d8b5d52f Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Tue, 22 Jul 2025 19:01:02 -0500 Subject: [PATCH 07/17] Organized imports --- .../main/scala/cats/effect/benchmarks/ParallelBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/main/scala/cats/effect/benchmarks/ParallelBenchmark.scala b/benchmarks/src/main/scala/cats/effect/benchmarks/ParallelBenchmark.scala index 5d5beefe37..33f093e028 100644 --- a/benchmarks/src/main/scala/cats/effect/benchmarks/ParallelBenchmark.scala +++ b/benchmarks/src/main/scala/cats/effect/benchmarks/ParallelBenchmark.scala @@ -17,9 +17,9 @@ package cats.effect.benchmarks import cats.effect.IO +import cats.effect.syntax.all._ import cats.effect.unsafe.implicits.global import cats.implicits.{catsSyntaxParallelTraverse1, toTraverseOps} -import cats.effect.syntax.all._ import org.openjdk.jmh.annotations._ import org.openjdk.jmh.infra.Blackhole From 6a2588cae7b359f3a2d441c491f440724cee198d Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Tue, 22 Jul 2025 19:21:26 -0500 Subject: [PATCH 08/17] Fixed null test for Scala 3 --- tests/shared/src/test/scala/cats/effect/IOSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index de0645a08d..366853e9aa 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -1754,7 +1754,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { _ <- IO { r1 mustEqual List(null, "x", "z", null, "z") } r2 <- List(1, 2, 3) .parTraverseN(2) { i => - if (i == 2) null + if (i == 2) null: IO[Int] else IO.pure(i) } .attempt From 599b790c864a07302414a932d0e8aff8de96eb42 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 23 Jul 2025 14:10:43 -0500 Subject: [PATCH 09/17] Removed spurious test that triggered scala.js bugs --- tests/shared/src/test/scala/cats/effect/IOSpec.scala | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index 366853e9aa..7e0922010e 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -1752,13 +1752,6 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { case null => IO.pure("z") } _ <- IO { r1 mustEqual List(null, "x", "z", null, "z") } - r2 <- List(1, 2, 3) - .parTraverseN(2) { i => - if (i == 2) null: IO[Int] - else IO.pure(i) - } - .attempt - _ <- IO { r2 must beLike { case Left(e) => e must haveClass[NullPointerException] } } } yield ok } } From 2e276bbf842c06b5ac3d90d400608a0274e4ac7f Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Wed, 23 Jul 2025 18:12:34 -0500 Subject: [PATCH 10/17] Added test for error short-circuiting --- tests/shared/src/test/scala/cats/effect/IOSpec.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index 7e0922010e..e988a3e922 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -1754,6 +1754,14 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { _ <- IO { r1 mustEqual List(null, "x", "z", null, "z") } } yield ok } + + "short-circuit on error" in real { + case object TestException extends RuntimeException + val target = 0.until(100000).toList + val test = target.parTraverseN(2)(_ => IO.raiseError(TestException)) + + test.attempt.timeoutTo(500.millis, IO(false must beTrue)).as(ok) + } } "parTraverseN_" should { From 3649a5db1da55886298768c04c4ff679fb311d1a Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 27 Jul 2025 11:35:22 -0500 Subject: [PATCH 11/17] Restructured failure case propagation --- .../cats/effect/kernel/GenConcurrent.scala | 50 +++++++++++++++---- .../src/test/scala/cats/effect/IOSpec.scala | 24 +++++++++ 2 files changed, 65 insertions(+), 9 deletions(-) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala index 7376bf6eda..d8d7518f86 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala @@ -141,6 +141,9 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { F.deferred[Option[E]] flatMap { preempt => F.ref[Set[Fiber[F, ?, ?]]](Set()) flatMap { supervision => + // has to be done in parallel to avoid head of line issues + val cancelAllF = supervision.get.flatMap(_.toList.parTraverse_(_.cancel)) + MiniSemaphore[F](n) flatMap { sem => val results = ta traverse { a => preempt.tryGet flatMap { @@ -154,10 +157,43 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { F.deferred[Outcome[F, E, B]] flatMap { result => val action = poll(sem.acquire) >> f(a) .guaranteeCase { oc => - result.complete(oc) *> oc.fold( - preempt.complete(None).void, - e => preempt.complete(Some(e)).void, - _ => F.unit) *> sem.release + val completion = oc match { + case Outcome.Succeeded(_) => + preempt.tryGet flatMap { + case Some(Some(e)) => + result.complete(Outcome.Errored(e)) + + case Some(None) => + result.complete(Outcome.Canceled()) + + case None => + result.complete(oc) + } + + case Outcome.Errored(e) => + preempt.complete(Some(e)) flatMap { won => + if (won) + result.complete(oc) <* cancelAllF.start // avoid deadlock + else + preempt.get flatMap { + case Some(e) => result.complete(Outcome.Errored(e)) + case None => result.complete(Outcome.Canceled()) + } + } + + case Outcome.Canceled() => + preempt.complete(None) flatMap { won => + if (won) + result.complete(oc) <* cancelAllF.start // avoid deadlock + else + preempt.get flatMap { + case Some(e) => result.complete(Outcome.Errored(e)) + case None => result.complete(Outcome.Canceled()) + } + } + } + + completion *> sem.release } .void .voidError @@ -177,11 +213,7 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { } } - results.flatMap(_.sequence) guaranteeCase { - case Outcome.Succeeded(_) => F.unit - // has to be done in parallel to avoid head of line issues - case _ => supervision.get.flatMap(_.toList.parTraverse_(_.cancel)) - } + results.flatMap(_.sequence).onCancel(cancelAllF) } } } diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index e988a3e922..2c2c1e05da 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -1762,6 +1762,30 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { test.attempt.timeoutTo(500.millis, IO(false must beTrue)).as(ok) } + + "run finalizers in parallel" in ticked { implicit ticker => + // this test also tests to ensure that we get the errored results rather than cancels + // note that the first two effects will have a Canceled outcome, while the third is Errored + // if we just go by first wins in sequence, then Canceled is the (incorrect) result + // first wins *in time* is the expected semantic here + val test = for { + latch1 <- IO.deferred[Unit] + latch2 <- IO.deferred[Unit] + + _ <- List(1, 2, 3).parTraverseN(3) { + case 1 => + IO.never.onCancel(latch1.complete(()) *> latch2.get) + + case 2 => + IO.never.onCancel(latch2.complete(()) *> latch1.get) + + case 3 => + IO.sleep(10.millis) *> IO.raiseError(new RuntimeException) + } + } yield () + + test.attempt.void must completeAs(()) + } } "parTraverseN_" should { From 584ce3b3eba44f13111848b4af71a9f5557e6cdc Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 8 Mar 2026 11:37:43 -0400 Subject: [PATCH 12/17] Added more tests and shuffled preemption paths to actually, you know, work --- .../cats/effect/kernel/GenConcurrent.scala | 196 ++++++++++-------- .../src/test/scala/cats/effect/IOSpec.scala | 171 ++++++++++++++- 2 files changed, 277 insertions(+), 90 deletions(-) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala index d8d7518f86..290dcbcf88 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala @@ -140,81 +140,82 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { implicit val F: GenConcurrent[F, E] = this F.deferred[Option[E]] flatMap { preempt => - F.ref[Set[Fiber[F, ?, ?]]](Set()) flatMap { supervision => - // has to be done in parallel to avoid head of line issues - val cancelAllF = supervision.get.flatMap(_.toList.parTraverse_(_.cancel)) + F.ref[Set[(Fiber[F, ?, ?], Deferred[F, Outcome[F, E, B]])]](Set()) flatMap { + supervision => + // has to be done in parallel to avoid head of line issues + def cancelAll(cause: Option[E]) = supervision.get flatMap { states => + val causeOC: Outcome[F, E, B] = cause match { + case Some(e) => Outcome.Errored(e) + case None => Outcome.Canceled() + } - MiniSemaphore[F](n) flatMap { sem => - val results = ta traverse { a => - preempt.tryGet flatMap { - case Some(_) => - // it's okay to produce never here because the early abort preceeds us - // this effect won't get sequenced, so it can be anything really - F.pure(F.never[B]) + states.toList parTraverse_ { + case (fiber, result) => + result.complete(causeOC) *> fiber.cancel + } + } - case None => - F.uncancelable { poll => - F.deferred[Outcome[F, E, B]] flatMap { result => - val action = poll(sem.acquire) >> f(a) - .guaranteeCase { oc => - val completion = oc match { - case Outcome.Succeeded(_) => - preempt.tryGet flatMap { - case Some(Some(e)) => - result.complete(Outcome.Errored(e)) - - case Some(None) => - result.complete(Outcome.Canceled()) - - case None => - result.complete(oc) - } - - case Outcome.Errored(e) => - preempt.complete(Some(e)) flatMap { won => - if (won) - result.complete(oc) <* cancelAllF.start // avoid deadlock - else - preempt.get flatMap { - case Some(e) => result.complete(Outcome.Errored(e)) - case None => result.complete(Outcome.Canceled()) - } - } - - case Outcome.Canceled() => - preempt.complete(None) flatMap { won => - if (won) - result.complete(oc) <* cancelAllF.start // avoid deadlock - else - preempt.get flatMap { - case Some(e) => result.complete(Outcome.Errored(e)) - case None => result.complete(Outcome.Canceled()) - } - } + MiniSemaphore[F](n) flatMap { sem => + val results = ta traverse { a => + preempt.tryGet flatMap { + case Some(Some(e)) => F.pure(F.raiseError[B](e)) + case Some(None) => F.pure(F.canceled *> F.never[B]) + + case None => + F.uncancelable { poll => + F.deferred[Outcome[F, E, B]] flatMap { result => + val action = poll(sem.acquire) >> f(a) + .guaranteeCase { oc => + val completion = oc match { + case Outcome.Succeeded(_) => + preempt.tryGet flatMap { + case Some(Some(e)) => + result.complete(Outcome.Errored(e)) + + case Some(None) => + result.complete(Outcome.Canceled()) + + case None => + result.complete(oc) + } + + case Outcome.Errored(e) => + preempt + .complete(Some(e)) + .ifM( + result.complete(oc) <* cancelAll(Some(e)).start, + false.pure[F]) + + case Outcome.Canceled() => + preempt + .complete(None) + .ifM( + result.complete(oc) <* cancelAll(None).start, + false.pure[F]) + } + + completion *> sem.release + } + .void + .voidError + .start + + action flatMap { fiber => + supervision.update(_ + ((fiber, result))) map { _ => + result + .get + .flatMap(_.embed(F.canceled *> F.never)) + .onCancel(fiber.cancel) + .guarantee(supervision.update(_ - ((fiber, result)))) } - - completion *> sem.release - } - .void - .voidError - .start - - action flatMap { fiber => - supervision.update(_ + fiber) map { _ => - result - .get - .flatMap(_.embed(F.canceled *> F.never)) - .onCancel(fiber.cancel) - .guarantee(supervision.update(_ - fiber)) } } } - } + } } - } - results.flatMap(_.sequence).onCancel(cancelAllF) - } + results.flatMap(_.sequence).onCancel(cancelAll(None)) + } } } } @@ -229,30 +230,39 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { implicit val F: GenConcurrent[F, E] = this - // TODO we need to write a test for error cancelation F.deferred[Option[E]] flatMap { preempt => F.ref[List[Fiber[F, ?, ?]]](Nil) flatMap { supervision => MiniSemaphore[F](n) flatMap { sem => + val cancelAll = supervision.get.flatMap(_.parTraverse_(_.cancel)) + + // doesn't complete until every fiber has been at least *started* val startAll = ta traverse_ { a => // first check to see if any of the effects have errored out // don't bother starting new things if that happens preempt.tryGet flatMap { - case Some(_) => - F.unit // allow the error to be resurfaced later + case Some(Some(e)) => + F.raiseError[Unit](e) + + case Some(None) => + F.canceled case None => F.uncancelable { poll => - // if the effect produces an error, race to kill all the rest - val wrapped = f(a) guaranteeCase { oc => - sem.release *> oc.fold( - preempt.complete(None).void, - e => preempt.complete(Some(e)).void, - _ => F.unit) + // if the effect produces a non-success, race to kill all the rest + val wrapped = f(a) guaranteeCase { + case Outcome.Succeeded(_) => + F.unit + + case Outcome.Errored(e) => + preempt.complete(Some(e)).void + + case Outcome.Canceled() => + preempt.complete(None).void } - val suppressed = wrapped.void.voidError + val suppressed = wrapped.void.voidError.guarantee(sem.release) - poll(sem.acquire) >> suppressed.start flatMap { fiber => + poll(sem.acquire) *> suppressed.start flatMap { fiber => // supervision is handled very differently here: we never remove from the set supervision.update(fiber :: _) } @@ -260,18 +270,26 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { } } - val cancelAll = supervision.get.flatMap(_.parTraverse_(_.cancel)) + // we only run this when we know that supervision is full + val awaitAll = preempt.tryGet flatMap { + case Some(_) => F.unit + case None => + F.race(preempt.get.void, supervision.get.flatMap(_.traverse_(_.join.void))).void + } - startAll.onCancel(cancelAll) *> - // we block until it's all done by acquiring all the permits - F.race(preempt.get *> cancelAll, sem.acquire.replicateA_(n)) *> - // if we hit an error or self-cancelation in any effect, resurface it here - // note that we can't lose errors here because of the permits: we know the fibers are done - preempt.tryGet flatMap { - case Some(Some(e)) => F.raiseError(e) - case Some(None) => F.canceled - case None => F.unit - } + // if we hit an error or self-cancelation in any effect, resurface it here + val resurface = preempt.tryGet flatMap { + case Some(Some(e)) => F.raiseError[Unit](e) + case Some(None) => F.canceled + case None => F.unit + } + + val work = (startAll *> awaitAll) guaranteeCase { + case Outcome.Succeeded(_) => F.unit + case Outcome.Errored(_) | Outcome.Canceled() => preempt.complete(None) *> cancelAll + } + + work *> resurface } } } diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index 2c2c1e05da..4e07a860c8 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -1760,7 +1760,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val target = 0.until(100000).toList val test = target.parTraverseN(2)(_ => IO.raiseError(TestException)) - test.attempt.timeoutTo(500.millis, IO(false must beTrue)).as(ok) + test.attempt.as(ok).timeoutTo(500.millis, IO(false must beTrue)) } "run finalizers in parallel" in ticked { implicit ticker => @@ -1814,6 +1814,175 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { p must completeAs(true) } + "run finalizers when canceled" in ticked { implicit ticker => + val p = for { + r <- IO.ref(0) + + /* + * The exact series of steps here is: + * + * List(IO.never.onCancel, IO.unit, IO.never.onCancel) + * + * This is significant because we're limiting the parallelism to + * 2, meaning that we will hit a wall after IO.unit. HOWEVER, + * IO.unit completes immediately, so this test not only checks + * cancelation, it also tests that we move onto the third item + * after the second one completes even while the first is blocked. + * In other words, it's testing both cancelation and head of line + * behavior. + */ + f <- List(1, 2, 3) + .parTraverseN_(2) { i => + if (i == 2) IO.unit + else IO.never.onCancel(r.update(_ + 1)) + } + .start + + _ <- IO.sleep(100.millis) + _ <- f.cancel + c <- r.get + _ <- IO { c mustEqual 2 } + } yield true + + p must completeAs(true) + } + + "propagate self-cancellation" in ticked { implicit ticker => + List(1, 2, 3, 4) + .parTraverseN_(2) { (n: Int) => + if (n == 3) IO.canceled *> IO.never + else IO.pure(n) + } + .void must selfCancel + } + + "run finalizers when a task self-cancels" in ticked { implicit ticker => + val p = for { + r <- IO.ref(0) + fib <- List(1, 2, 3, 4) + .parTraverseN_(2) { (n: Int) => + if (n == 3) IO.canceled *> IO.never + else IO.pure(n) + } + .onCancel(r.update(_ + 1)) + .void + .start + _ <- IO.sleep(100.millis) + c <- r.get + _ <- IO { c mustEqual 1 } + oc <- fib.join + } yield oc.isCanceled + + p must completeAs(true) + } + + "not run more than `n` tasks at a time" in real { + def task(counter: Ref[IO, Int], maximum: Ref[IO, Int]): IO[Unit] = { + val acq = counter.updateAndGet(_ + 1).flatMap { count => + maximum.update { max => if (count > max) count else max } + } + IO.asyncForIO.bracket(acq) { _ => IO.sleep(100.millis) }(_ => counter.update(_ - 1)) + } + + for { + maximum <- Ref.of[IO, Int](0) + counter <- Ref.of[IO, Int](0) + nCpu <- IO { Runtime.getRuntime().availableProcessors() } + n = java.lang.Math.max(nCpu, 2) + size = 4 * n + _ <- (1 to size).toList.parTraverseN_(n) { _ => task(counter, maximum) } + count <- counter.get + _ <- IO { count mustEqual 0 } + max <- maximum.get + _ <- IO { max must beLessThanOrEqualTo(n) } + } yield ok + } + + "run actually in parallel" in real { + val n = 4 + (1 to 2 * n) + .toList + .map(_ => IO.sleep(1.second)) + .parSequenceN_(n) + .as(true) + .timeoutTo(3.seconds, IO.pure(false)) + .flatMap(res => IO { res must beTrue }) + } + + "work for empty traverse" in ticked { implicit ticker => + List.empty[Int].parTraverseN_(4) { _ => IO.never[String] } must completeAs(()) + } + + "work for non-empty traverse (ticked)" in ticked { implicit ticker => + List(1).parTraverseN_(4) { i => IO.pure(i.toString) } must completeAs(()) + List(1, 2).parTraverseN_(3) { i => IO.pure(i.toString) } must completeAs(()) + List(1, 2, 3).parTraverseN_(2) { i => IO.pure(i.toString) } must completeAs(()) + List(1, 2, 3, 4).parTraverseN_(1) { i => IO.pure(i.toString) } must completeAs(()) + } + + "work for non-empty traverse (real)" in real { + for { + _ <- List(1).parTraverseN_(4)(i => IO.pure(i.toString)).flatMap { r => + IO(r.mustEqual(())) + } + _ <- List(1, 2).parTraverseN_(3)(i => IO.pure(i.toString)).flatMap { r => + IO(r.mustEqual(())) + } + _ <- List(1, 2, 3).parTraverseN_(2)(i => IO.pure(i.toString)).flatMap { r => + IO(r.mustEqual(())) + } + _ <- List(1, 2, 3, 4).parTraverseN_(1)(i => IO.pure(i.toString)).flatMap { r => + IO(r.mustEqual(())) + } + _ <- (1 to 10000).toList.parTraverseN_(2)(i => IO.pure(i.toString)).flatMap { r => + IO(r.mustEqual(())) + } + } yield ok + } + + "be null-safe" in real { + for { + r1 <- List[String]("a", "b", null, "d", null).parTraverseN_(2) { + case "a" => IO.pure(null) + case "b" => IO.pure("x") + case "d" => IO.pure(null) + case null => IO.pure("z") + } + _ <- IO { r1 mustEqual (()) } // just trying to make sure we don't crash + } yield ok + } + + "short-circuit on error" in real { + case object TestException extends RuntimeException + val target = 0.until(100000).toList + val test = target.parTraverseN_(2)(_ => IO.raiseError(TestException)) + + test.attempt.as(ok).timeoutTo(500.millis, IO(false must beTrue)) + } + + "run finalizers in parallel" in ticked { implicit ticker => + // this test also tests to ensure that we get the errored results rather than cancels + // note that the first two effects will have a Canceled outcome, while the third is Errored + // if we just go by first wins in sequence, then Canceled is the (incorrect) result + // first wins *in time* is the expected semantic here + val test = for { + latch1 <- IO.deferred[Unit] + latch2 <- IO.deferred[Unit] + + _ <- List(1, 2, 3).parTraverseN_(3) { + case 1 => + IO.never.onCancel(latch1.complete(()) *> latch2.get) + + case 2 => + IO.never.onCancel(latch2.complete(()) *> latch1.get) + + case 3 => + IO.sleep(10.millis) *> IO.raiseError(new RuntimeException) + } + } yield () + + test.attempt.void must completeAs(()) + } } "parallel" should { From 4c2e0d79374222a250844f472910a88ee9d0af2f Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 8 Mar 2026 12:27:52 -0400 Subject: [PATCH 13/17] We need a bit more time in CI --- tests/shared/src/test/scala/cats/effect/IOSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index 4e07a860c8..54b3715528 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -1760,7 +1760,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val target = 0.until(100000).toList val test = target.parTraverseN(2)(_ => IO.raiseError(TestException)) - test.attempt.as(ok).timeoutTo(500.millis, IO(false must beTrue)) + test.attempt.as(ok).timeoutTo(1.second, IO(false must beTrue)) } "run finalizers in parallel" in ticked { implicit ticker => @@ -1957,7 +1957,7 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { val target = 0.until(100000).toList val test = target.parTraverseN_(2)(_ => IO.raiseError(TestException)) - test.attempt.as(ok).timeoutTo(500.millis, IO(false must beTrue)) + test.attempt.as(ok).timeoutTo(1.second, IO(false must beTrue)) } "run finalizers in parallel" in ticked { implicit ticker => From 7709fd1226917e774030eab684f125f7f3865043 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 8 Mar 2026 12:54:43 -0400 Subject: [PATCH 14/17] Moved short circuiting tests to JVM platform --- .../cats/effect/IOPlatformSpecification.scala | 21 +++++++++++++++++++ .../src/test/scala/cats/effect/IOSpec.scala | 16 -------------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala index 9dc054dc34..bd24301cde 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala @@ -27,6 +27,7 @@ import cats.effect.unsafe.{ WorkStealingThreadPool } import cats.effect.unsafe.metrics.PollerMetrics +import cats.effect.syntax.all._ import cats.syntax.all._ import org.scalacheck.Prop.forAll @@ -812,6 +813,26 @@ trait IOPlatformSpecification extends DetectPlatform { self: BaseSpec with Scala } } + "parTraverseN" >> { + "short-circuit on error" in real { + case object TestException extends RuntimeException + val target = 0.until(100000).toList + val test = target.parTraverseN(2)(_ => IO.raiseError(TestException)) + + test.attempt.as(ok).timeoutTo(500.millis, IO(false must beTrue)) + } + } + + "parTraverseN_" >> { + "short-circuit on error" in real { + case object TestException extends RuntimeException + val target = 0.until(100000).toList + val test = target.parTraverseN_(2)(_ => IO.raiseError(TestException)) + + test.attempt.as(ok).timeoutTo(500.millis, IO(false must beTrue)) + } + } + if (javaMajorVersion >= 21) "block in-place on virtual threads" in real { val loomExec = classOf[Executors] diff --git a/tests/shared/src/test/scala/cats/effect/IOSpec.scala b/tests/shared/src/test/scala/cats/effect/IOSpec.scala index 54b3715528..f112b2f254 100644 --- a/tests/shared/src/test/scala/cats/effect/IOSpec.scala +++ b/tests/shared/src/test/scala/cats/effect/IOSpec.scala @@ -1755,14 +1755,6 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { } yield ok } - "short-circuit on error" in real { - case object TestException extends RuntimeException - val target = 0.until(100000).toList - val test = target.parTraverseN(2)(_ => IO.raiseError(TestException)) - - test.attempt.as(ok).timeoutTo(1.second, IO(false must beTrue)) - } - "run finalizers in parallel" in ticked { implicit ticker => // this test also tests to ensure that we get the errored results rather than cancels // note that the first two effects will have a Canceled outcome, while the third is Errored @@ -1952,14 +1944,6 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification { } yield ok } - "short-circuit on error" in real { - case object TestException extends RuntimeException - val target = 0.until(100000).toList - val test = target.parTraverseN_(2)(_ => IO.raiseError(TestException)) - - test.attempt.as(ok).timeoutTo(1.second, IO(false must beTrue)) - } - "run finalizers in parallel" in ticked { implicit ticker => // this test also tests to ensure that we get the errored results rather than cancels // note that the first two effects will have a Canceled outcome, while the third is Errored From 4aa9ae7ef770165e80266b2bacd2762c4f88ddb4 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 8 Mar 2026 13:06:19 -0400 Subject: [PATCH 15/17] Scalafix --- .../src/test/scala/cats/effect/IOPlatformSpecification.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala index bd24301cde..ee99d1ead3 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSpecification.scala @@ -17,6 +17,7 @@ package cats.effect import cats.effect.std.Semaphore +import cats.effect.syntax.all._ import cats.effect.unsafe.{ IORuntime, IORuntimeConfig, @@ -27,7 +28,6 @@ import cats.effect.unsafe.{ WorkStealingThreadPool } import cats.effect.unsafe.metrics.PollerMetrics -import cats.effect.syntax.all._ import cats.syntax.all._ import org.scalacheck.Prop.forAll From 417d3596dd8da26db4314261380cc3f6e82d5c02 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 8 Mar 2026 14:02:49 -0400 Subject: [PATCH 16/17] Updated scaladoc and fixed a laziness bug --- .../cats/effect/kernel/GenConcurrent.scala | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala index 290dcbcf88..7d8f859767 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala @@ -130,9 +130,17 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { parTraverseN_(n)(tma)(identity) /** - * Like `Parallel.parTraverse`, but limits the degree of parallelism. Note that the semantics - * of this operation aim to maximise fairness: when a spot to execute becomes available, every - * task has a chance to claim it, and not only the next `n` tasks in `ta` + * Like `Parallel.parTraverse`, but limits the degree of parallelism. The semantics of this + * function are ordered based on the `Traverse`. The first ''n'' actions will be started + * first, with subsequent actions starting in order as each one completes. Actions which are + * reached earlier in `traverse` order will be started slightly sooner than later actions, in + * a non-blocking fashion. Any errors or self-cancelation will immediately abort the sequence. + * If multiple actios produce errors simultaneously, one of them will be nondeterministically + * selected for production. If all actions succeed, their results are returned in the same + * order as their corresponding inputs, regardless of the order in which they executed. + * + * The `f` function is run as part of running the action: in parallel and subject to the + * limit. */ def parTraverseN[T[_]: Traverse, A, B](n: Int)(ta: T[A])(f: A => F[B]): F[T[B]] = { require(n >= 1, s"Concurrency limit should be at least 1, was: $n") @@ -164,6 +172,7 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { case None => F.uncancelable { poll => F.deferred[Outcome[F, E, B]] flatMap { result => + // laziness is significant here, since it pushes the `f` into the fiber val action = poll(sem.acquire) >> f(a) .guaranteeCase { oc => val completion = oc match { @@ -221,9 +230,16 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { } /** - * Like `Parallel.parTraverse_`, but limits the degree of parallelism. Note that the semantics - * of this operation aim to maximise fairness: when a spot to execute becomes available, every - * task has a chance to claim it, and not only the next `n` tasks in `ta` + * Like `Parallel.parTraverse_`, but limits the degree of parallelism. The semantics of this + * function are ordered based on the `Foldable`. The first ''n'' actions will be started + * first, with subsequent actions starting in order as each one completes. Actions which are + * reached earlier in `foldLeftM` order will be started slightly sooner than later actions, in + * a non-blocking fashion. Any errors or self-cancelation will immediately abort the sequence. + * If multiple actios produce errors simultaneously, one of them will be nondeterministically + * selected for production. + * + * The `f` function is run as part of running the action: in parallel and subject to the + * limit. */ def parTraverseN_[T[_]: Foldable, A, B](n: Int)(ta: T[A])(f: A => F[B]): F[Unit] = { require(n >= 1, s"Concurrency limit should be at least 1, was: $n") @@ -262,7 +278,8 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { val suppressed = wrapped.void.voidError.guarantee(sem.release) - poll(sem.acquire) *> suppressed.start flatMap { fiber => + // the laziness is significant here since it pushes the f into the fiber + poll(sem.acquire) >> suppressed.start flatMap { fiber => // supervision is handled very differently here: we never remove from the set supervision.update(fiber :: _) } From e195227b2df02704e3fd7560eb30fd281fe1ea17 Mon Sep 17 00:00:00 2001 From: Daniel Spiewak Date: Sun, 8 Mar 2026 15:18:37 -0400 Subject: [PATCH 17/17] Fixed function suspension in fiber --- .../main/scala/cats/effect/kernel/GenConcurrent.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala index 7d8f859767..1d7058370b 100644 --- a/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala +++ b/kernel/shared/src/main/scala/cats/effect/kernel/GenConcurrent.scala @@ -172,8 +172,8 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { case None => F.uncancelable { poll => F.deferred[Outcome[F, E, B]] flatMap { result => - // laziness is significant here, since it pushes the `f` into the fiber - val action = poll(sem.acquire) >> f(a) + // the laziness is a poor mans defer; this ensures the f gets pushed to the fiber + val action = poll(sem.acquire) *> (F.unit >> f(a)) .guaranteeCase { oc => val completion = oc match { case Outcome.Succeeded(_) => @@ -265,7 +265,8 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { case None => F.uncancelable { poll => // if the effect produces a non-success, race to kill all the rest - val wrapped = f(a) guaranteeCase { + // the laziness is a poor mans defer; this ensures the f gets pushed to the fiber + val wrapped = (F.unit >> f(a)) guaranteeCase { case Outcome.Succeeded(_) => F.unit @@ -278,8 +279,7 @@ trait GenConcurrent[F[_], E] extends GenSpawn[F, E] { val suppressed = wrapped.void.voidError.guarantee(sem.release) - // the laziness is significant here since it pushes the f into the fiber - poll(sem.acquire) >> suppressed.start flatMap { fiber => + poll(sem.acquire) *> suppressed.start flatMap { fiber => // supervision is handled very differently here: we never remove from the set supervision.update(fiber :: _) }