From 0d5f9775dd446c0ee50efa089bfaf5235fe1642a Mon Sep 17 00:00:00 2001 From: Andrew Golovashevich Date: Tue, 9 Sep 2025 20:19:05 +0300 Subject: [PATCH] Updated fibers api --- .../multitasking_0/ExecutionState.kt | 9 -- .../multitasking_0/{ => fibers}/Fiber.kt | 32 ++++- .../fibers/FiberInterruptedException.kt | 8 ++ .../{ => fibers}/FiberRoutine.kt | 2 +- .../multitasking_0/threads/Thread.kt | 3 +- .../threads/ThreadLocalMethods.kt | 2 +- .../threads/_FibersStackNode.kt | 2 +- .../multitasking_0/threads/Thread.kt | 2 +- .../threads/ThreadLocalMethods.kt | 2 +- .../JavaVirtualThreadFiber.kt | 122 ++++++++++++++---- .../multitasking_0/threads/Thread.kt | 3 +- 11 files changed, 139 insertions(+), 48 deletions(-) delete mode 100644 src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/ExecutionState.kt rename src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/{ => fibers}/Fiber.kt (67%) create mode 100644 src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/fibers/FiberInterruptedException.kt rename src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/{ => fibers}/FiberRoutine.kt (60%) diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/ExecutionState.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/ExecutionState.kt deleted file mode 100644 index 4045254..0000000 --- a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/ExecutionState.kt +++ /dev/null @@ -1,9 +0,0 @@ -package ru.landgrafhomyak.multitasking_0 - -public enum class ExecutionState { - CREATED, - RUNNING, - MANUALLY_SUSPENDED, - FINISHED_SUCCESSFULLY, - FINISHED_WITH_ERROR -} \ No newline at end of file diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/Fiber.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/fibers/Fiber.kt similarity index 67% rename from src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/Fiber.kt rename to src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/fibers/Fiber.kt index 83452e0..c973f01 100644 --- a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/Fiber.kt +++ b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/fibers/Fiber.kt @@ -1,4 +1,4 @@ -package ru.landgrafhomyak.multitasking_0 +package ru.landgrafhomyak.multitasking_0.fibers import ru.landgrafhomyak.multitasking_0.threads.Thread @@ -20,13 +20,13 @@ public interface Fiber { * Current state of fiber. * Thread-safe getter. */ - public val state: ExecutionState + public val state: State /** * Thread in which [resume()][Fiber.resume] was called. * Thread-safe getter. * - * Returns `null` on all [states][Fiber.state] except [RUNNING][ExecutionState.RUNNING]. + * Returns `null` on all [states][Fiber.state] except [RESUMED][Fiber.State.RESUMED]. * * On most implementations means that fiber actually runs on this [thread][Thread], * but some emulations (e.g. [java's virtual threads](https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html)) can't guarantee this. @@ -40,7 +40,7 @@ public interface Fiber { * Thread-safe getter. * * If [resumed][Fiber.resume] when [thread][Fiber.resumedOnThread] doesn't run any fiber, returns `null`. - * Returns `null` on all [states][Fiber.state] except [RUNNING][ExecutionState.RUNNING]. + * Returns `null` on all [states][Fiber.state] except [RESUMED][Fiber.State.RESUMED]. * * Behavior of resuming fiber inside fiber is implementation-defined, * so this property has only logical meaning for debugging (e.g. recovering async stacktrace). @@ -60,14 +60,32 @@ public interface Fiber { * If fiber [bound to any thread][Fiber.ownerThread], switching must be done only inside this thread or fibers running in this thread. */ public fun resume() - - public fun interrupt() + /** + * Same as [resume()][Fiber.resume], but throws [FiberInterruptedException] inside fiber with provided [message] and [cause]. + */ + public fun resumeWithInterruption(message: String? = null, cause: Throwable? = null) /** * Returns uncaught exception that terminated this fiber. - * If current [state][Fiber.state] isn't [FINISHED_WITH_ERROR][ExecutionState.FINISHED_WITH_ERROR] will throw [IllegalStateException]. + * If current [state][Fiber.state] isn't [FINISHED_WITH_ERROR][Fiber.State.FINISHED_WITH_ERROR] will throw [IllegalStateException]. */ public val uncaughtException: Throwable + /** + * Releases all resources allocated by this descriptor. + * Fiber must be in states [CREATED][Fiber.State.CREATED], [FINISHED_SUCCESSFULLY][Fiber.State.FINISHED_SUCCESSFULLY] + * or [FINISHED_WITH_ERROR][Fiber.State.FINISHED_WITH_ERROR]. + * + * Calling any methods after this one is forbidden. + */ public fun releaseResources() + + public enum class State { + CREATED, + RESUMED, + MANUALLY_SUSPENDED, + FINISHED_SUCCESSFULLY, + FINISHED_WITH_ERROR, + DESTROYED + } } \ No newline at end of file diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/fibers/FiberInterruptedException.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/fibers/FiberInterruptedException.kt new file mode 100644 index 0000000..9ef0609 --- /dev/null +++ b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/fibers/FiberInterruptedException.kt @@ -0,0 +1,8 @@ +package ru.landgrafhomyak.multitasking_0.fibers + +public class FiberInterruptedException: RuntimeException { + public constructor() : super() + public constructor(message: String?) : super(message) + public constructor(message: String?, cause: Throwable?) : super(message, cause) + public constructor(cause: Throwable?) : super(cause) +} \ No newline at end of file diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/FiberRoutine.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/fibers/FiberRoutine.kt similarity index 60% rename from src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/FiberRoutine.kt rename to src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/fibers/FiberRoutine.kt index a2693f0..7f861aa 100644 --- a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/FiberRoutine.kt +++ b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/fibers/FiberRoutine.kt @@ -1,4 +1,4 @@ -package ru.landgrafhomyak.multitasking_0 +package ru.landgrafhomyak.multitasking_0.fibers public fun interface FiberRoutine { public fun runFiber(fiber: Fiber) diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt index c08f3ab..a11f0db 100644 --- a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt +++ b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt @@ -1,7 +1,8 @@ package ru.landgrafhomyak.multitasking_0.threads -import ru.landgrafhomyak.multitasking_0.Fiber +import ru.landgrafhomyak.multitasking_0.fibers.Fiber +@Suppress("EXPECT_ACTUAL_IR_INCOMPATIBILITY") public expect class Thread { /** * Name of thread for debug purposes. diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt index 33c4b08..6a3460d 100644 --- a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt +++ b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt @@ -1,6 +1,6 @@ package ru.landgrafhomyak.multitasking_0.threads -import ru.landgrafhomyak.multitasking_0.Fiber +import ru.landgrafhomyak.multitasking_0.fibers.Fiber import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/_FibersStackNode.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/_FibersStackNode.kt index 0e07d0a..473aa65 100644 --- a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/_FibersStackNode.kt +++ b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/_FibersStackNode.kt @@ -1,7 +1,7 @@ package ru.landgrafhomyak.multitasking_0.threads import kotlin.jvm.JvmField -import ru.landgrafhomyak.multitasking_0.Fiber +import ru.landgrafhomyak.multitasking_0.fibers.Fiber internal class _FibersStackNode( @JvmField diff --git a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt index 56b32cc..7e1a417 100644 --- a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt +++ b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt @@ -3,7 +3,7 @@ package ru.landgrafhomyak.multitasking_0.threads import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock import java.lang.Thread as jThread -import ru.landgrafhomyak.multitasking_0.Fiber +import ru.landgrafhomyak.multitasking_0.fibers.Fiber import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException diff --git a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt index f4393a9..f376ed6 100644 --- a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt +++ b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt @@ -1,6 +1,6 @@ package ru.landgrafhomyak.multitasking_0.threads -import ru.landgrafhomyak.multitasking_0.Fiber +import ru.landgrafhomyak.multitasking_0.fibers.Fiber import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop public actual sealed interface ThreadLocalMethods { diff --git a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/impl/java_virtual_threads/JavaVirtualThreadFiber.kt b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/impl/java_virtual_threads/JavaVirtualThreadFiber.kt index 4e807cd..3702761 100644 --- a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/impl/java_virtual_threads/JavaVirtualThreadFiber.kt +++ b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/impl/java_virtual_threads/JavaVirtualThreadFiber.kt @@ -5,30 +5,32 @@ import java.lang.Thread as jThread import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock -import ru.landgrafhomyak.multitasking_0.ExecutionState -import ru.landgrafhomyak.multitasking_0.Fiber -import ru.landgrafhomyak.multitasking_0.FiberRoutine +import ru.landgrafhomyak.multitasking_0.fibers.Fiber +import ru.landgrafhomyak.multitasking_0.fibers.FiberRoutine import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException +import ru.landgrafhomyak.multitasking_0.fibers.FiberInterruptedException import ru.landgrafhomyak.multitasking_0.threads.Thread as wThread public class JavaVirtualThreadFiber : Fiber { private val _thisVThread: jThread private val _syncLock: ReentrantLock private val _syncCond: Condition - private var _state: ExecutionState + private var _state: Fiber.State private var _uncaughtException: Throwable? override val name: String private var _resumedOnThread: wThread? private var _resumedOnFiber: Fiber? + private var _interruptionData: InterruptionData? public constructor(name: String, routine: FiberRoutine) { this.name = name this._syncLock = ReentrantLock() this._syncCond = this._syncLock.newCondition() - this._state = ExecutionState.CREATED + this._state = Fiber.State.CREATED this._uncaughtException = null this._resumedOnThread = null this._resumedOnFiber = null + this._interruptionData = null this._syncLock.withLock { this._thisVThread = jThread.ofVirtual().name("fiber: $name").start(this.Kernel(routine)) @@ -36,33 +38,54 @@ public class JavaVirtualThreadFiber : Fiber { } } + private class InterruptionData(val message: String?, val cause: Throwable?) + + private inner class Kernel(private val _routine: FiberRoutine) : Runnable { override fun run() { try { this@JavaVirtualThreadFiber._syncLock.withLock { + wThread._tl_currentThread.set(null) this@JavaVirtualThreadFiber._syncCond.signal() this@JavaVirtualThreadFiber._syncCond.await() + if (this@JavaVirtualThreadFiber._state == Fiber.State.DESTROYED) + return wThread._tl_currentThread.set(this@JavaVirtualThreadFiber._resumedOnThread!!) } this._routine.runFiber(this@JavaVirtualThreadFiber) this@JavaVirtualThreadFiber._syncLock.withLock { - this@JavaVirtualThreadFiber._state = ExecutionState.FINISHED_SUCCESSFULLY + this@JavaVirtualThreadFiber._state = Fiber.State.FINISHED_SUCCESSFULLY this@JavaVirtualThreadFiber._syncCond.signal() } } catch (t: Throwable) { this@JavaVirtualThreadFiber._syncLock.withLock { this@JavaVirtualThreadFiber._uncaughtException = t - this@JavaVirtualThreadFiber._state = ExecutionState.FINISHED_WITH_ERROR + this@JavaVirtualThreadFiber._state = Fiber.State.FINISHED_WITH_ERROR this@JavaVirtualThreadFiber._syncCond.signal() } } } } - override val ownerThread: ru.landgrafhomyak.multitasking_0.threads.Thread? get() = null - override val state: ExecutionState get() = this._syncLock.withLock { this._state } - override val resumedOnThread: ru.landgrafhomyak.multitasking_0.threads.Thread? get() = this._syncLock.withLock { this._resumedOnThread } - override val resumedOnFiber: Fiber? get() = this._syncLock.withLock { this._resumedOnFiber } + override val ownerThread: wThread? + get() = this._syncLock.withLock { + if (this._state == Fiber.State.DESTROYED) + throw IllegalStateException("Fiber destroyed") + return@withLock null + } + override val state: Fiber.State get() = this._syncLock.withLock { this._state } + override val resumedOnThread: wThread? + get() = this._syncLock.withLock { + if (this._state == Fiber.State.DESTROYED) + throw IllegalStateException("Fiber destroyed") + return@withLock this._resumedOnThread + } + override val resumedOnFiber: Fiber? + get() = this._syncLock.withLock { + if (this._state == Fiber.State.DESTROYED) + throw IllegalStateException("Fiber destroyed") + return@withLock this._resumedOnFiber + } override fun yield() { if (jThread.currentThread() !== this._thisVThread) @@ -70,12 +93,14 @@ public class JavaVirtualThreadFiber : Fiber { this._syncLock.withLock { when (this._state) { - ExecutionState.RUNNING -> this._state = ExecutionState.MANUALLY_SUSPENDED - ExecutionState.CREATED, ExecutionState.MANUALLY_SUSPENDED -> + Fiber.State.RESUMED -> this._state = Fiber.State.MANUALLY_SUSPENDED + Fiber.State.CREATED, Fiber.State.MANUALLY_SUSPENDED -> throw IllegalStateException(".yield() called inside fiber while it is suspended") - ExecutionState.FINISHED_SUCCESSFULLY, ExecutionState.FINISHED_WITH_ERROR -> + Fiber.State.FINISHED_SUCCESSFULLY, Fiber.State.FINISHED_WITH_ERROR -> throw IllegalStateException(".yield() called inside fiber after it's marked as finished") + + Fiber.State.DESTROYED -> throw IllegalStateException("Fiber destroyed") } @@ -83,16 +108,23 @@ public class JavaVirtualThreadFiber : Fiber { this._syncCond.signal() this._syncCond.await() wThread._tl_currentThread.set(this._resumedOnThread!!) + this._interruptionData?.let { id -> throw FiberInterruptedException(id.message, id.cause) } } } - override fun resume() { + private fun _resume(id: InterruptionData?) { this._syncLock.withLock { when (this._state) { - ExecutionState.CREATED, ExecutionState.MANUALLY_SUSPENDED -> - this._state = ExecutionState.RUNNING + Fiber.State.CREATED -> { + if (id != null) + throw IllegalStateException("Can't interrupt unstarted fiber") + this._state = Fiber.State.RESUMED + } - ExecutionState.RUNNING -> { + Fiber.State.MANUALLY_SUSPENDED -> + this._state = Fiber.State.RESUMED + + Fiber.State.RESUMED -> { if (jThread.currentThread() == this._thisVThread) { throw IllegalStateException("Recursive fiber execution") } else { @@ -100,13 +132,16 @@ public class JavaVirtualThreadFiber : Fiber { } } - ExecutionState.FINISHED_SUCCESSFULLY, ExecutionState.FINISHED_WITH_ERROR -> throw IllegalStateException("Fiber already finished") + Fiber.State.FINISHED_SUCCESSFULLY, Fiber.State.FINISHED_WITH_ERROR -> throw IllegalStateException("Fiber already finished") + + Fiber.State.DESTROYED -> throw IllegalStateException("Fiber destroyed") } val token = jObject() val currentThread = wThread.current this._resumedOnThread = currentThread.get() this._resumedOnFiber = currentThread.enterFiber(this, token) + this._interruptionData = id this._syncCond.signal() this._syncCond.await() @@ -117,12 +152,49 @@ public class JavaVirtualThreadFiber : Fiber { } } - override val uncaughtException: Throwable - get() = this._uncaughtException ?: when (this._syncLock.withLock { this._state }) { - ExecutionState.CREATED, ExecutionState.RUNNING, ExecutionState.MANUALLY_SUSPENDED -> - throw IllegalStateException("Fiber not finished yet") + override fun resume() { + this._resume(null) + } - ExecutionState.FINISHED_SUCCESSFULLY -> throw IllegalStateException("Fiber finished without exception") - ExecutionState.FINISHED_WITH_ERROR -> throw IllegalStateException("Finished with exception but it wasn't saved") + override fun resumeWithInterruption(message: String?, cause: Throwable?) { + this._resume(InterruptionData(message, cause)) + } + + override val uncaughtException: Throwable + get() { + this._syncLock.withLock { + return this._uncaughtException ?: when (this._state) { + Fiber.State.CREATED, Fiber.State.RESUMED, Fiber.State.MANUALLY_SUSPENDED -> + throw IllegalStateException("Fiber not finished yet") + + Fiber.State.FINISHED_SUCCESSFULLY -> throw IllegalStateException("Fiber finished without exception") + Fiber.State.FINISHED_WITH_ERROR -> throw IllegalStateException("Finished with exception but it wasn't saved") + Fiber.State.DESTROYED -> throw IllegalStateException("Fiber destroyed") + } + } } + + override fun releaseResources() { + this._syncLock.withLock { + when (this._state) { + Fiber.State.CREATED -> { + this._state = Fiber.State.DESTROYED + this._syncCond.signal() + this._syncLock.unlock() + this._thisVThread.join() + this._syncLock.lock() + } + + Fiber.State.RESUMED -> throw IllegalStateException("Can't destroy fiber while it running") + Fiber.State.MANUALLY_SUSPENDED -> throw IllegalStateException("Can't destroy fiber until it finished") + + Fiber.State.FINISHED_SUCCESSFULLY, Fiber.State.FINISHED_WITH_ERROR -> { + this._thisVThread.join() + this._state = Fiber.State.DESTROYED + } + + Fiber.State.DESTROYED -> throw IllegalStateException("Fiber already destroyed") + } + } + } } \ No newline at end of file diff --git a/src/multithreadPlatformMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt b/src/multithreadPlatformMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt index 65f88ea..96f279e 100644 --- a/src/multithreadPlatformMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt +++ b/src/multithreadPlatformMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt @@ -1,11 +1,12 @@ package ru.landgrafhomyak.multitasking_0.threads import kotlin.experimental.ExpectRefinement -import ru.landgrafhomyak.multitasking_0.Fiber +import ru.landgrafhomyak.multitasking_0.fibers.Fiber // didn't work because AMBIGUOUS_EXPECTS @OptIn(ExperimentalMultiplatform::class) @ExpectRefinement +@Suppress("EXPECT_ACTUAL_IR_INCOMPATIBILITY") public expect class Thread { /** * Name of thread for debug purposes.