Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leaking fiber monitors when running in custom IORuntime using JDK 21 virtual threads #3929

Closed
jacum opened this issue Jan 3, 2024 · 11 comments · Fixed by #3964
Closed

Leaking fiber monitors when running in custom IORuntime using JDK 21 virtual threads #3929

jacum opened this issue Jan 3, 2024 · 11 comments · Fixed by #3964
Assignees
Labels
Milestone

Comments

@jacum
Copy link

jacum commented Jan 3, 2024

import cats.effect.std.Dispatcher
import cats.effect.unsafe.{IORuntime, IORuntimeBuilder}
import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits.toTraverseOps
import com.sun.management.HotSpotDiagnosticMXBean

import java.io.File
import java.lang.management.ManagementFactory
import java.util.UUID
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

object CatsEffectLeakDumper extends IOApp {

  override protected def runtime: IORuntime = {
    val compute          = Executors.newVirtualThreadPerTaskExecutor()
    val computeCtx = ExecutionContext.fromExecutor(compute);
    IORuntimeBuilder
      .apply()
      .setCompute(computeCtx, () => compute.shutdown())
      .build()
  }

  def dumpHeap(fileName: String, live: Boolean = false): Unit = {
    val server = ManagementFactory.getPlatformMBeanServer
    val hotspotMBean = ManagementFactory.newPlatformMXBeanProxy(server, "com.sun.management:type=HotSpotDiagnostic", classOf[HotSpotDiagnosticMXBean])
    if (new File(fileName).exists()) {
      val _ = new File(fileName).delete
    }
    hotspotMBean.dumpHeap(fileName, live)
  }
  override def run(args: List[String]): IO[ExitCode] = Dispatcher.parallel[IO].flatMap { d =>
    IO(
      Runtime.getRuntime.addShutdownHook(new Thread(() => dumpHeap("heapdump.hprof", live = true)))
    ) >> {
      for {
        _ <- (1 to 1000000).toList.traverse(_ => IO.delay {
          d.unsafeRunSync(IO(UUID.randomUUID()))
        })
      } yield ()
    } toResource
  }.use(_ => IO.pure(ExitCode.Success))

}

Run with -Dcats.effect.tracing.mode=FULL on JDK21.

Observe heapdump.hprof -> biggest objects -> cats.effect.unsafe.FiberMonitor -- many MBs
cc @djspiewak

@durban
Copy link
Contributor

durban commented Jan 4, 2024

It seems the leak is due to the nodes of the BagReferences ConcurrentLinkedQueue in FiberMonitor, and the (empty) weakref objects in them. As far as I can see, these are never cleaned up, so the same would happen with normal threads too (except those are probably not created in an unbounded number).

@armanbilge
Copy link
Member

Hmm, that's actually really annoying. I think we'll want to rethink that thread-local strategy for virtual threads since there's a 1-to-1 fiber-to-virtual-thread mapping. Ideally we'd be using the carrier thread's local storage, not the virtual thread's for the bag. But that's probably not possible.

so the same would happen with normal threads too (except those are probably not created in an unbounded number).

But we should probably still fix this anyway.

@durban
Copy link
Contributor

durban commented Jan 5, 2024

Ideally we'd be using the carrier thread's local storage, not the virtual thread's for the bag. But that's probably not possible.

Well... https://github.com/openjdk/jdk21u/blob/060c4f7589e7f13febd402f4dac3320f4c032b08/src/java.base/share/classes/jdk/internal/misc/CarrierThreadLocal.java#L35

(I'm not seriously suggesting we use that, just found it interesting.)

@armanbilge
Copy link
Member

Thanks, makes sense that we are not the only ones needing that 😇

I'm not seriously suggesting we use that

even if we wanted to these new JVMs make it annoying/impossible to access their internals

@djspiewak
Copy link
Member

You know, I'm actually very surprised that thread locals aren't just cleaned up with the thread itself. Arman is right that we really want the carrier thread here, not the virtual thread, but even with the virtual thread this doesn't feel like a thing that should be leaking (just a thing that would be very inefficient).

@armanbilge
Copy link
Member

It seems the leak is due to the nodes of the BagReferences ConcurrentLinkedQueue in FiberMonitor, and the (empty) weakref objects in them.

@djspiewak as Daniel U pointed out, the leak is actually our fault:

private[this] final val BagReferences =
new ConcurrentLinkedQueue[WeakReference[WeakBag[Runnable]]]
private[this] final val Bags = ThreadLocal.withInitial { () =>
val bag = new WeakBag[Runnable]()
BagReferences.offer(new WeakReference(bag))
bag
}

We are lacking a "packing" mechanism for BagReferences.

@djspiewak
Copy link
Member

Right but that's a weak reference, so it shouldn't prevent the cleanup of bag.

@armanbilge
Copy link
Member

Right, but the ConcurrentLinkedQueue will keep growing more and more nodes, pointing to GCed bags.

@djspiewak
Copy link
Member

The solution is probably to do something annoying with ReferenceQueue and periodically check that queue for size. Once the queue gets big enough, we go through and purge out the collected refs in Bags. This would have to be amortized into monitorSuspended or similar, creating a moderate overhead. Probably not enough overhead to worry about gating it to virtual threads, so this would also work to resolve the similar issue with blocking.

We wouldn't need JDK 21 for this (since it's not loom specific), so we could probably resolve it in 3.5.x

@armanbilge
Copy link
Member

armanbilge commented Jan 17, 2024

I've proposed a fix for the general issue in #3964.

But from a performance standpoint I still think that we need a different solution for virtual threads. The current mechanism adds a lot of overhead:

  1. a single point of contention for registering every thread
  2. allocating an entire WeakBag (intended to be long-lived and hold references to many fibers over its lifetime) for a virtual thread, even though we will only place a single fiber in there
  3. nested layers of weak references, which we know that the GC just loves /s
    Specifically we hold a weak reference to a bag which itself hold a weak reference to the fiber.

@armanbilge
Copy link
Member

armanbilge commented Feb 23, 2024

Fixed the leak in #3964 and opened a follow-up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants