Updated fibers api

This commit is contained in:
Andrew Golovashevich 2025-09-09 20:19:05 +03:00
parent 85e693aba8
commit 0d5f9775dd
11 changed files with 139 additions and 48 deletions

View File

@ -1,9 +0,0 @@
package ru.landgrafhomyak.multitasking_0
public enum class ExecutionState {
CREATED,
RUNNING,
MANUALLY_SUSPENDED,
FINISHED_SUCCESSFULLY,
FINISHED_WITH_ERROR
}

View File

@ -1,4 +1,4 @@
package ru.landgrafhomyak.multitasking_0 package ru.landgrafhomyak.multitasking_0.fibers
import ru.landgrafhomyak.multitasking_0.threads.Thread import ru.landgrafhomyak.multitasking_0.threads.Thread
@ -20,13 +20,13 @@ public interface Fiber {
* Current state of fiber. * Current state of fiber.
* Thread-safe getter. * Thread-safe getter.
*/ */
public val state: ExecutionState public val state: State
/** /**
* Thread in which [resume()][Fiber.resume] was called. * Thread in which [resume()][Fiber.resume] was called.
* Thread-safe getter. * Thread-safe getter.
* *
* Returns `null` on all [states][Fiber.state] except [RUNNING][ExecutionState.RUNNING]. * Returns `null` on all [states][Fiber.state] except [RESUMED][Fiber.State.RESUMED].
* *
* On most implementations means that fiber actually runs on this [thread][Thread], * On most implementations means that fiber actually runs on this [thread][Thread],
* but some emulations (e.g. [java's virtual threads](https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html)) can't guarantee this. * but some emulations (e.g. [java's virtual threads](https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html)) can't guarantee this.
@ -40,7 +40,7 @@ public interface Fiber {
* Thread-safe getter. * Thread-safe getter.
* *
* If [resumed][Fiber.resume] when [thread][Fiber.resumedOnThread] doesn't run any fiber, returns `null`. * If [resumed][Fiber.resume] when [thread][Fiber.resumedOnThread] doesn't run any fiber, returns `null`.
* Returns `null` on all [states][Fiber.state] except [RUNNING][ExecutionState.RUNNING]. * Returns `null` on all [states][Fiber.state] except [RESUMED][Fiber.State.RESUMED].
* *
* Behavior of resuming fiber inside fiber is implementation-defined, * Behavior of resuming fiber inside fiber is implementation-defined,
* so this property has only logical meaning for debugging (e.g. recovering async stacktrace). * so this property has only logical meaning for debugging (e.g. recovering async stacktrace).
@ -60,14 +60,32 @@ public interface Fiber {
* If fiber [bound to any thread][Fiber.ownerThread], switching must be done only inside this thread or fibers running in this thread. * If fiber [bound to any thread][Fiber.ownerThread], switching must be done only inside this thread or fibers running in this thread.
*/ */
public fun resume() public fun resume()
/**
public fun interrupt() * Same as [resume()][Fiber.resume], but throws [FiberInterruptedException] inside fiber with provided [message] and [cause].
*/
public fun resumeWithInterruption(message: String? = null, cause: Throwable? = null)
/** /**
* Returns uncaught exception that terminated this fiber. * Returns uncaught exception that terminated this fiber.
* If current [state][Fiber.state] isn't [FINISHED_WITH_ERROR][ExecutionState.FINISHED_WITH_ERROR] will throw [IllegalStateException]. * If current [state][Fiber.state] isn't [FINISHED_WITH_ERROR][Fiber.State.FINISHED_WITH_ERROR] will throw [IllegalStateException].
*/ */
public val uncaughtException: Throwable public val uncaughtException: Throwable
/**
* Releases all resources allocated by this descriptor.
* Fiber must be in states [CREATED][Fiber.State.CREATED], [FINISHED_SUCCESSFULLY][Fiber.State.FINISHED_SUCCESSFULLY]
* or [FINISHED_WITH_ERROR][Fiber.State.FINISHED_WITH_ERROR].
*
* Calling any methods after this one is forbidden.
*/
public fun releaseResources() public fun releaseResources()
public enum class State {
CREATED,
RESUMED,
MANUALLY_SUSPENDED,
FINISHED_SUCCESSFULLY,
FINISHED_WITH_ERROR,
DESTROYED
}
} }

View File

@ -0,0 +1,8 @@
package ru.landgrafhomyak.multitasking_0.fibers
public class FiberInterruptedException: RuntimeException {
public constructor() : super()
public constructor(message: String?) : super(message)
public constructor(message: String?, cause: Throwable?) : super(message, cause)
public constructor(cause: Throwable?) : super(cause)
}

View File

@ -1,4 +1,4 @@
package ru.landgrafhomyak.multitasking_0 package ru.landgrafhomyak.multitasking_0.fibers
public fun interface FiberRoutine { public fun interface FiberRoutine {
public fun runFiber(fiber: Fiber) public fun runFiber(fiber: Fiber)

View File

@ -1,7 +1,8 @@
package ru.landgrafhomyak.multitasking_0.threads package ru.landgrafhomyak.multitasking_0.threads
import ru.landgrafhomyak.multitasking_0.Fiber import ru.landgrafhomyak.multitasking_0.fibers.Fiber
@Suppress("EXPECT_ACTUAL_IR_INCOMPATIBILITY")
public expect class Thread { public expect class Thread {
/** /**
* Name of thread for debug purposes. * Name of thread for debug purposes.

View File

@ -1,6 +1,6 @@
package ru.landgrafhomyak.multitasking_0.threads package ru.landgrafhomyak.multitasking_0.threads
import ru.landgrafhomyak.multitasking_0.Fiber import ru.landgrafhomyak.multitasking_0.fibers.Fiber
import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop
import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException

View File

@ -1,7 +1,7 @@
package ru.landgrafhomyak.multitasking_0.threads package ru.landgrafhomyak.multitasking_0.threads
import kotlin.jvm.JvmField import kotlin.jvm.JvmField
import ru.landgrafhomyak.multitasking_0.Fiber import ru.landgrafhomyak.multitasking_0.fibers.Fiber
internal class _FibersStackNode( internal class _FibersStackNode(
@JvmField @JvmField

View File

@ -3,7 +3,7 @@ package ru.landgrafhomyak.multitasking_0.threads
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock import kotlin.concurrent.withLock
import java.lang.Thread as jThread import java.lang.Thread as jThread
import ru.landgrafhomyak.multitasking_0.Fiber import ru.landgrafhomyak.multitasking_0.fibers.Fiber
import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop
import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException

View File

@ -1,6 +1,6 @@
package ru.landgrafhomyak.multitasking_0.threads package ru.landgrafhomyak.multitasking_0.threads
import ru.landgrafhomyak.multitasking_0.Fiber import ru.landgrafhomyak.multitasking_0.fibers.Fiber
import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop
public actual sealed interface ThreadLocalMethods { public actual sealed interface ThreadLocalMethods {

View File

@ -5,30 +5,32 @@ import java.lang.Thread as jThread
import java.util.concurrent.locks.Condition import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock import kotlin.concurrent.withLock
import ru.landgrafhomyak.multitasking_0.ExecutionState import ru.landgrafhomyak.multitasking_0.fibers.Fiber
import ru.landgrafhomyak.multitasking_0.Fiber import ru.landgrafhomyak.multitasking_0.fibers.FiberRoutine
import ru.landgrafhomyak.multitasking_0.FiberRoutine
import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException
import ru.landgrafhomyak.multitasking_0.fibers.FiberInterruptedException
import ru.landgrafhomyak.multitasking_0.threads.Thread as wThread import ru.landgrafhomyak.multitasking_0.threads.Thread as wThread
public class JavaVirtualThreadFiber : Fiber { public class JavaVirtualThreadFiber : Fiber {
private val _thisVThread: jThread private val _thisVThread: jThread
private val _syncLock: ReentrantLock private val _syncLock: ReentrantLock
private val _syncCond: Condition private val _syncCond: Condition
private var _state: ExecutionState private var _state: Fiber.State
private var _uncaughtException: Throwable? private var _uncaughtException: Throwable?
override val name: String override val name: String
private var _resumedOnThread: wThread? private var _resumedOnThread: wThread?
private var _resumedOnFiber: Fiber? private var _resumedOnFiber: Fiber?
private var _interruptionData: InterruptionData?
public constructor(name: String, routine: FiberRoutine) { public constructor(name: String, routine: FiberRoutine) {
this.name = name this.name = name
this._syncLock = ReentrantLock() this._syncLock = ReentrantLock()
this._syncCond = this._syncLock.newCondition() this._syncCond = this._syncLock.newCondition()
this._state = ExecutionState.CREATED this._state = Fiber.State.CREATED
this._uncaughtException = null this._uncaughtException = null
this._resumedOnThread = null this._resumedOnThread = null
this._resumedOnFiber = null this._resumedOnFiber = null
this._interruptionData = null
this._syncLock.withLock { this._syncLock.withLock {
this._thisVThread = jThread.ofVirtual().name("fiber: $name").start(this.Kernel(routine)) this._thisVThread = jThread.ofVirtual().name("fiber: $name").start(this.Kernel(routine))
@ -36,33 +38,54 @@ public class JavaVirtualThreadFiber : Fiber {
} }
} }
private class InterruptionData(val message: String?, val cause: Throwable?)
private inner class Kernel(private val _routine: FiberRoutine) : Runnable { private inner class Kernel(private val _routine: FiberRoutine) : Runnable {
override fun run() { override fun run() {
try { try {
this@JavaVirtualThreadFiber._syncLock.withLock { this@JavaVirtualThreadFiber._syncLock.withLock {
wThread._tl_currentThread.set(null)
this@JavaVirtualThreadFiber._syncCond.signal() this@JavaVirtualThreadFiber._syncCond.signal()
this@JavaVirtualThreadFiber._syncCond.await() this@JavaVirtualThreadFiber._syncCond.await()
if (this@JavaVirtualThreadFiber._state == Fiber.State.DESTROYED)
return
wThread._tl_currentThread.set(this@JavaVirtualThreadFiber._resumedOnThread!!) wThread._tl_currentThread.set(this@JavaVirtualThreadFiber._resumedOnThread!!)
} }
this._routine.runFiber(this@JavaVirtualThreadFiber) this._routine.runFiber(this@JavaVirtualThreadFiber)
this@JavaVirtualThreadFiber._syncLock.withLock { this@JavaVirtualThreadFiber._syncLock.withLock {
this@JavaVirtualThreadFiber._state = ExecutionState.FINISHED_SUCCESSFULLY this@JavaVirtualThreadFiber._state = Fiber.State.FINISHED_SUCCESSFULLY
this@JavaVirtualThreadFiber._syncCond.signal() this@JavaVirtualThreadFiber._syncCond.signal()
} }
} catch (t: Throwable) { } catch (t: Throwable) {
this@JavaVirtualThreadFiber._syncLock.withLock { this@JavaVirtualThreadFiber._syncLock.withLock {
this@JavaVirtualThreadFiber._uncaughtException = t this@JavaVirtualThreadFiber._uncaughtException = t
this@JavaVirtualThreadFiber._state = ExecutionState.FINISHED_WITH_ERROR this@JavaVirtualThreadFiber._state = Fiber.State.FINISHED_WITH_ERROR
this@JavaVirtualThreadFiber._syncCond.signal() this@JavaVirtualThreadFiber._syncCond.signal()
} }
} }
} }
} }
override val ownerThread: ru.landgrafhomyak.multitasking_0.threads.Thread? get() = null override val ownerThread: wThread?
override val state: ExecutionState get() = this._syncLock.withLock { this._state } get() = this._syncLock.withLock {
override val resumedOnThread: ru.landgrafhomyak.multitasking_0.threads.Thread? get() = this._syncLock.withLock { this._resumedOnThread } if (this._state == Fiber.State.DESTROYED)
override val resumedOnFiber: Fiber? get() = this._syncLock.withLock { this._resumedOnFiber } throw IllegalStateException("Fiber destroyed")
return@withLock null
}
override val state: Fiber.State get() = this._syncLock.withLock { this._state }
override val resumedOnThread: wThread?
get() = this._syncLock.withLock {
if (this._state == Fiber.State.DESTROYED)
throw IllegalStateException("Fiber destroyed")
return@withLock this._resumedOnThread
}
override val resumedOnFiber: Fiber?
get() = this._syncLock.withLock {
if (this._state == Fiber.State.DESTROYED)
throw IllegalStateException("Fiber destroyed")
return@withLock this._resumedOnFiber
}
override fun yield() { override fun yield() {
if (jThread.currentThread() !== this._thisVThread) if (jThread.currentThread() !== this._thisVThread)
@ -70,12 +93,14 @@ public class JavaVirtualThreadFiber : Fiber {
this._syncLock.withLock { this._syncLock.withLock {
when (this._state) { when (this._state) {
ExecutionState.RUNNING -> this._state = ExecutionState.MANUALLY_SUSPENDED Fiber.State.RESUMED -> this._state = Fiber.State.MANUALLY_SUSPENDED
ExecutionState.CREATED, ExecutionState.MANUALLY_SUSPENDED -> Fiber.State.CREATED, Fiber.State.MANUALLY_SUSPENDED ->
throw IllegalStateException(".yield() called inside fiber while it is suspended") throw IllegalStateException(".yield() called inside fiber while it is suspended")
ExecutionState.FINISHED_SUCCESSFULLY, ExecutionState.FINISHED_WITH_ERROR -> Fiber.State.FINISHED_SUCCESSFULLY, Fiber.State.FINISHED_WITH_ERROR ->
throw IllegalStateException(".yield() called inside fiber after it's marked as finished") throw IllegalStateException(".yield() called inside fiber after it's marked as finished")
Fiber.State.DESTROYED -> throw IllegalStateException("Fiber destroyed")
} }
@ -83,16 +108,23 @@ public class JavaVirtualThreadFiber : Fiber {
this._syncCond.signal() this._syncCond.signal()
this._syncCond.await() this._syncCond.await()
wThread._tl_currentThread.set(this._resumedOnThread!!) wThread._tl_currentThread.set(this._resumedOnThread!!)
this._interruptionData?.let { id -> throw FiberInterruptedException(id.message, id.cause) }
} }
} }
override fun resume() { private fun _resume(id: InterruptionData?) {
this._syncLock.withLock { this._syncLock.withLock {
when (this._state) { when (this._state) {
ExecutionState.CREATED, ExecutionState.MANUALLY_SUSPENDED -> Fiber.State.CREATED -> {
this._state = ExecutionState.RUNNING if (id != null)
throw IllegalStateException("Can't interrupt unstarted fiber")
this._state = Fiber.State.RESUMED
}
ExecutionState.RUNNING -> { Fiber.State.MANUALLY_SUSPENDED ->
this._state = Fiber.State.RESUMED
Fiber.State.RESUMED -> {
if (jThread.currentThread() == this._thisVThread) { if (jThread.currentThread() == this._thisVThread) {
throw IllegalStateException("Recursive fiber execution") throw IllegalStateException("Recursive fiber execution")
} else { } else {
@ -100,13 +132,16 @@ public class JavaVirtualThreadFiber : Fiber {
} }
} }
ExecutionState.FINISHED_SUCCESSFULLY, ExecutionState.FINISHED_WITH_ERROR -> throw IllegalStateException("Fiber already finished") Fiber.State.FINISHED_SUCCESSFULLY, Fiber.State.FINISHED_WITH_ERROR -> throw IllegalStateException("Fiber already finished")
Fiber.State.DESTROYED -> throw IllegalStateException("Fiber destroyed")
} }
val token = jObject() val token = jObject()
val currentThread = wThread.current val currentThread = wThread.current
this._resumedOnThread = currentThread.get() this._resumedOnThread = currentThread.get()
this._resumedOnFiber = currentThread.enterFiber(this, token) this._resumedOnFiber = currentThread.enterFiber(this, token)
this._interruptionData = id
this._syncCond.signal() this._syncCond.signal()
this._syncCond.await() this._syncCond.await()
@ -117,12 +152,49 @@ public class JavaVirtualThreadFiber : Fiber {
} }
} }
override fun resume() {
this._resume(null)
}
override fun resumeWithInterruption(message: String?, cause: Throwable?) {
this._resume(InterruptionData(message, cause))
}
override val uncaughtException: Throwable override val uncaughtException: Throwable
get() = this._uncaughtException ?: when (this._syncLock.withLock { this._state }) { get() {
ExecutionState.CREATED, ExecutionState.RUNNING, ExecutionState.MANUALLY_SUSPENDED -> this._syncLock.withLock {
return this._uncaughtException ?: when (this._state) {
Fiber.State.CREATED, Fiber.State.RESUMED, Fiber.State.MANUALLY_SUSPENDED ->
throw IllegalStateException("Fiber not finished yet") throw IllegalStateException("Fiber not finished yet")
ExecutionState.FINISHED_SUCCESSFULLY -> throw IllegalStateException("Fiber finished without exception") Fiber.State.FINISHED_SUCCESSFULLY -> throw IllegalStateException("Fiber finished without exception")
ExecutionState.FINISHED_WITH_ERROR -> throw IllegalStateException("Finished with exception but it wasn't saved") Fiber.State.FINISHED_WITH_ERROR -> throw IllegalStateException("Finished with exception but it wasn't saved")
Fiber.State.DESTROYED -> throw IllegalStateException("Fiber destroyed")
}
}
}
override fun releaseResources() {
this._syncLock.withLock {
when (this._state) {
Fiber.State.CREATED -> {
this._state = Fiber.State.DESTROYED
this._syncCond.signal()
this._syncLock.unlock()
this._thisVThread.join()
this._syncLock.lock()
}
Fiber.State.RESUMED -> throw IllegalStateException("Can't destroy fiber while it running")
Fiber.State.MANUALLY_SUSPENDED -> throw IllegalStateException("Can't destroy fiber until it finished")
Fiber.State.FINISHED_SUCCESSFULLY, Fiber.State.FINISHED_WITH_ERROR -> {
this._thisVThread.join()
this._state = Fiber.State.DESTROYED
}
Fiber.State.DESTROYED -> throw IllegalStateException("Fiber already destroyed")
}
}
} }
} }

View File

@ -1,11 +1,12 @@
package ru.landgrafhomyak.multitasking_0.threads package ru.landgrafhomyak.multitasking_0.threads
import kotlin.experimental.ExpectRefinement import kotlin.experimental.ExpectRefinement
import ru.landgrafhomyak.multitasking_0.Fiber import ru.landgrafhomyak.multitasking_0.fibers.Fiber
// didn't work because AMBIGUOUS_EXPECTS // didn't work because AMBIGUOUS_EXPECTS
@OptIn(ExperimentalMultiplatform::class) @OptIn(ExperimentalMultiplatform::class)
@ExpectRefinement @ExpectRefinement
@Suppress("EXPECT_ACTUAL_IR_INCOMPATIBILITY")
public expect class Thread { public expect class Thread {
/** /**
* Name of thread for debug purposes. * Name of thread for debug purposes.