From d06f700803a84c41a0aa3618ad8ce479061e2421 Mon Sep 17 00:00:00 2001 From: Andrew Golovashevich Date: Thu, 11 Sep 2025 00:02:49 +0300 Subject: [PATCH] Similar to java's LockSupports.park API --- .../threads/ClearEventLoopCallback.kt | 8 ++ .../threads/EventLoopClearer.kt | 11 -- .../{FiberExiter.kt => ExitFiberCallback.kt} | 2 +- .../threads/ThreadLocalMethods.kt | 39 ++++++- .../threads/sync/ResumeThreadCallback.kt | 9 ++ .../JavaVirtualThreadFiber.kt | 5 +- .../multitasking_0/threads/Thread.kt | 102 ++++++++++++++++-- .../threads/ThreadLocalMethods.kt | 10 +- 8 files changed, 160 insertions(+), 26 deletions(-) create mode 100644 src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ClearEventLoopCallback.kt delete mode 100644 src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/EventLoopClearer.kt rename src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/{FiberExiter.kt => ExitFiberCallback.kt} (94%) create mode 100644 src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/sync/ResumeThreadCallback.kt diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ClearEventLoopCallback.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ClearEventLoopCallback.kt new file mode 100644 index 0000000..45a4cf2 --- /dev/null +++ b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ClearEventLoopCallback.kt @@ -0,0 +1,8 @@ +package ru.landgrafhomyak.multitasking_0.threads + +/** + * @see ThreadLocalMethods.setEventLoop + */ +public interface ClearEventLoopCallback { + public fun clearEventLoop() +} \ No newline at end of file diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/EventLoopClearer.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/EventLoopClearer.kt deleted file mode 100644 index fc961a0..0000000 --- a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/EventLoopClearer.kt +++ /dev/null @@ -1,11 +0,0 @@ -package ru.landgrafhomyak.multitasking_0.threads - -import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException -import ru.landgrafhomyak.multitasking_0.fibers.Fiber - -/** - * @see ThreadLocalMethods.setEventLoop - */ -public interface EventLoopClearer { - public fun clearEventLoop() -} \ No newline at end of file diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/FiberExiter.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ExitFiberCallback.kt similarity index 94% rename from src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/FiberExiter.kt rename to src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ExitFiberCallback.kt index ef057c4..b9f8bb7 100644 --- a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/FiberExiter.kt +++ b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ExitFiberCallback.kt @@ -6,7 +6,7 @@ import ru.landgrafhomyak.multitasking_0.fibers.Fiber /** * @see ThreadLocalMethods.enterFiber */ -public interface FiberExiter { +public interface ExitFiberCallback { public val callerFiber: Fiber? /** * Informs thread that execution flow was returned from [fiber][Fiber]. diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt index 7455d4b..5e99024 100644 --- a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt +++ b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt @@ -3,6 +3,7 @@ package ru.landgrafhomyak.multitasking_0.threads import ru.landgrafhomyak.multitasking_0.fibers.Fiber import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException +import ru.landgrafhomyak.multitasking_0.threads.sync.ResumeThreadCallback public expect sealed interface ThreadLocalMethods { /** @@ -19,7 +20,7 @@ public expect sealed interface ThreadLocalMethods { * If set, condition `Thread.current.runningFiber.resumedOnThread === Thread.current.get()` always true * * [Fiber][Fiber] implementations must control this property explicitly with [enterFiber()][ThreadLocalMethods.enterFiber] - * and [exitFiber()][ThreadLocalMethods.exitFiber] + * and [exitFiber()][ExitFiberCallback.exitFiber] * * @throws WrongCallerThreadException if called on thread different that caller of [Thread.current]. */ @@ -34,8 +35,40 @@ public expect sealed interface ThreadLocalMethods { * * @throws WrongCallerThreadException if called on thread different that caller of [Thread.current]. */ - public fun enterFiber(fiberToEnter: Fiber): FiberExiter + public fun enterFiber(fiberToEnter: Fiber): ExitFiberCallback public val runningEventLoop: SingleThreadEventLoop? - public fun setEventLoop(eventLoop: SingleThreadEventLoop): EventLoopClearer + public fun setEventLoop(eventLoop: SingleThreadEventLoop): ClearEventLoopCallback + + /** + * Hints platform scheduler to switch to another thread. Doesn't guarantee switching. + */ + public fun yield() + + /** + * Removes thread from system scheduler until [resumeThread()][ResumeThreadCallback.resumeThread]. + * If system allows unauthorized thread resuming, blocks these attempts. + * + * It's a high-level mechanism to implement custom synchronization methods, it isn't faster than existing native synchronization primitives. + * + * @param store callback to obtain and store [ResumeThreadCallback] reference. + * If [resumeThread()][ResumeThreadCallback.resumeThread] called inside - immediately returns. + */ + public fun suspendThread(store: (ResumeThreadCallback) -> Unit) + + /** + * Removes thread from system scheduler until [resumeThread()][ResumeThreadCallback.resumeThread] or until timeout expires. + * If system allows unauthorized thread resuming, blocks these attempts. + * + * When timeout expires, [resumeThread()][ResumeThreadCallback.resumeThread] becomes unavailable + * (wrap it with explicit synchronization) and [timeoutHandler] called in thread that was suspended before this function returns. + * + * It's a high-level mechanism to implement custom synchronization methods, it isn't faster than existing native synchronization primitives. + * + * @param timeoutMillis timeout in milliseconds. If equals to `0`, [store] and [timeoutHandler] will be called anyway. + * @param timeoutHandler callback called by resumed thread after timeout expired. + * @param store callback to obtain and store [ResumeThreadCallback] reference. + * If [resumeThread()][ResumeThreadCallback.resumeThread] called inside - immediately returns. + */ + public fun suspendThread(timeoutMillis: UInt, timeoutHandler: ResumeThreadCallback.TimeoutHandler, store: (ResumeThreadCallback) -> Unit) } \ No newline at end of file diff --git a/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/sync/ResumeThreadCallback.kt b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/sync/ResumeThreadCallback.kt new file mode 100644 index 0000000..e4d6236 --- /dev/null +++ b/src/commonMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/sync/ResumeThreadCallback.kt @@ -0,0 +1,9 @@ +package ru.landgrafhomyak.multitasking_0.threads.sync + +public interface ResumeThreadCallback { + public fun resumeThread() + + public interface TimeoutHandler { + public fun resumedBecauseOfTimeout() + } +} \ No newline at end of file diff --git a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/impl/java_virtual_threads/JavaVirtualThreadFiber.kt b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/impl/java_virtual_threads/JavaVirtualThreadFiber.kt index d03daff..dfffb76 100644 --- a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/impl/java_virtual_threads/JavaVirtualThreadFiber.kt +++ b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/impl/java_virtual_threads/JavaVirtualThreadFiber.kt @@ -1,6 +1,5 @@ package ru.landgrafhomyak.multitasking_0.impl.java_virtual_threads -import java.lang.Object as jObject import java.lang.Thread as jThread import java.util.concurrent.locks.Condition import java.util.concurrent.locks.ReentrantLock @@ -9,7 +8,7 @@ import ru.landgrafhomyak.multitasking_0.fibers.Fiber import ru.landgrafhomyak.multitasking_0.fibers.FiberRoutine import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException import ru.landgrafhomyak.multitasking_0.fibers.FiberInterruptedException -import ru.landgrafhomyak.multitasking_0.threads.FiberExiter +import ru.landgrafhomyak.multitasking_0.threads.ExitFiberCallback import ru.landgrafhomyak.multitasking_0.threads.Thread as wThread public class JavaVirtualThreadFiber : Fiber { @@ -20,7 +19,7 @@ public class JavaVirtualThreadFiber : Fiber { private var _uncaughtException: Throwable? override val name: String private var _resumedOnThread: wThread? - private var _fiberExiter: FiberExiter? + private var _fiberExiter: ExitFiberCallback? private var _interruptionData: InterruptionData? public constructor(name: String, routine: FiberRoutine) { diff --git a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt index 43f93fd..4cf552f 100644 --- a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt +++ b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt @@ -1,12 +1,14 @@ package ru.landgrafhomyak.multitasking_0.threads import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.LockSupport import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock import java.lang.Thread as jThread import ru.landgrafhomyak.multitasking_0.fibers.Fiber import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException +import ru.landgrafhomyak.multitasking_0.threads.sync.ResumeThreadCallback @Suppress("AMBIGUOUS_EXPECTS", "ACTUAL_WITHOUT_EXPECT") public actual sealed class Thread { @@ -165,7 +167,7 @@ public actual sealed class Thread { val caller: FibersStackNode?, @JvmField val fiber: Fiber, - ) : FiberExiter { + ) : ExitFiberCallback { private val _isClosed = AtomicBoolean(false) override val callerFiber: Fiber? get() { @@ -194,7 +196,7 @@ public actual sealed class Thread { return this._fibersStack?.fiber } - override fun enterFiber(fiberToEnter: Fiber): FiberExiter { + override fun enterFiber(fiberToEnter: Fiber): ExitFiberCallback { this._assertThread() val node = this.FibersStackNode(this._fibersStack, fiberToEnter) this._fibersStack = node @@ -210,9 +212,9 @@ public actual sealed class Thread { } - private inner class EventLoopClearerImpl( + private inner class ClearEventLoopCallbackImpl( - ) : EventLoopClearer { + ) : ClearEventLoopCallback { private val _isClosed = AtomicBoolean(false) override fun clearEventLoop() { @@ -223,12 +225,100 @@ public actual sealed class Thread { } } - override fun setEventLoop(eventLoop: SingleThreadEventLoop): EventLoopClearer { + override fun setEventLoop(eventLoop: SingleThreadEventLoop): ClearEventLoopCallback { this._assertThread() if (this._runningEventLoop != null) throw IllegalStateException("There is already a running event loop here") this._runningEventLoop = eventLoop - return this.EventLoopClearerImpl() + return this.ClearEventLoopCallbackImpl() + } + + override fun yield() { + this._assertThread() + jThread.yield() + } + + private inner class ResumeThreadCallbackImpl : ResumeThreadCallback { + val sync = AtomicBoolean(false) + var isCalled = false + + override fun resumeThread() { + while (!this.sync.compareAndSet(false, true)) + jThread.onSpinWait() + + if (this.isCalled) { + this.sync.set(false) + throw IllegalStateException("Thread already resumed by this callback instance") + } + when (this@Thread._nativeThread.state) { + jThread.State.WAITING, jThread.State.TIMED_WAITING -> { + LockSupport.unpark(this@Thread._nativeThread) + } + + else -> { + // catched after external unpark but before it enters lock or before it enters park at all + } + } + this.isCalled = true + this.sync.set(false) + } + } + + override fun suspendThread(store: (ResumeThreadCallback) -> Unit) { + this._assertThread() + val resumer = this.ResumeThreadCallbackImpl() + store(resumer) + run { + while (!resumer.sync.compareAndSet(false, true)) + jThread.onSpinWait() + val isCalled = resumer.isCalled + resumer.sync.set(false) + if (isCalled) + return + } + while (true) { + LockSupport.park() + + while (!resumer.sync.compareAndSet(false, true)) + jThread.onSpinWait() + + val isCalled = resumer.isCalled + resumer.sync.set(false) + if (isCalled) + break + } + } + + override fun suspendThread(timeoutMillis: UInt, timeoutHandler: ResumeThreadCallback.TimeoutHandler, store: (ResumeThreadCallback) -> Unit) { + this._assertThread() + val resumer = this.ResumeThreadCallbackImpl() + store(resumer) + run { + while (!resumer.sync.compareAndSet(false, true)) + jThread.onSpinWait() + val isCalled = resumer.isCalled + resumer.sync.set(false) + if (isCalled) + return + } + val endTime = System.nanoTime() + (timeoutMillis.toULong() * 1_000_000uL).toLong() + while (true) { + LockSupport.parkNanos(endTime - System.nanoTime()) + while (!resumer.sync.compareAndSet(false, true)) + jThread.onSpinWait() + + if (resumer.isCalled) { + resumer.sync.set(false) + break + } + + if (System.nanoTime() >= endTime) { + resumer.isCalled = true + resumer.sync.set(false) + timeoutHandler.resumedBecauseOfTimeout() + } + resumer.sync.set(false) + } } } } \ No newline at end of file diff --git a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt index 203c07a..f4aaa2e 100644 --- a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt +++ b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ThreadLocalMethods.kt @@ -2,13 +2,19 @@ package ru.landgrafhomyak.multitasking_0.threads import ru.landgrafhomyak.multitasking_0.fibers.Fiber import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop +import ru.landgrafhomyak.multitasking_0.threads.sync.ResumeThreadCallback public actual sealed interface ThreadLocalMethods { public actual fun get(): Thread public actual val runningFiber: Fiber? - public actual fun enterFiber(fiberToEnter: Fiber): FiberExiter + public actual fun enterFiber(fiberToEnter: Fiber): ExitFiberCallback public actual val runningEventLoop: SingleThreadEventLoop? - public actual fun setEventLoop(eventLoop: SingleThreadEventLoop): EventLoopClearer + public actual fun setEventLoop(eventLoop: SingleThreadEventLoop): ClearEventLoopCallback + + public actual fun yield() + + public actual fun suspendThread(store: (ResumeThreadCallback) -> Unit) + public actual fun suspendThread(timeoutMillis: UInt, timeoutHandler: ResumeThreadCallback.TimeoutHandler, store: (ResumeThreadCallback) -> Unit) } \ No newline at end of file