Similar to java's LockSupports.park API

This commit is contained in:
Andrew Golovashevich 2025-09-11 00:02:49 +03:00
parent 538ede8206
commit d06f700803
8 changed files with 160 additions and 26 deletions

View File

@ -0,0 +1,8 @@
package ru.landgrafhomyak.multitasking_0.threads
/**
* @see ThreadLocalMethods.setEventLoop
*/
public interface ClearEventLoopCallback {
public fun clearEventLoop()
}

View File

@ -1,11 +0,0 @@
package ru.landgrafhomyak.multitasking_0.threads
import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException
import ru.landgrafhomyak.multitasking_0.fibers.Fiber
/**
* @see ThreadLocalMethods.setEventLoop
*/
public interface EventLoopClearer {
public fun clearEventLoop()
}

View File

@ -6,7 +6,7 @@ import ru.landgrafhomyak.multitasking_0.fibers.Fiber
/** /**
* @see ThreadLocalMethods.enterFiber * @see ThreadLocalMethods.enterFiber
*/ */
public interface FiberExiter { public interface ExitFiberCallback {
public val callerFiber: Fiber? public val callerFiber: Fiber?
/** /**
* Informs thread that execution flow was returned from [fiber][Fiber]. * Informs thread that execution flow was returned from [fiber][Fiber].

View File

@ -3,6 +3,7 @@ package ru.landgrafhomyak.multitasking_0.threads
import ru.landgrafhomyak.multitasking_0.fibers.Fiber import ru.landgrafhomyak.multitasking_0.fibers.Fiber
import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop
import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException
import ru.landgrafhomyak.multitasking_0.threads.sync.ResumeThreadCallback
public expect sealed interface ThreadLocalMethods { public expect sealed interface ThreadLocalMethods {
/** /**
@ -19,7 +20,7 @@ public expect sealed interface ThreadLocalMethods {
* If set, condition `Thread.current.runningFiber.resumedOnThread === Thread.current.get()` always true * If set, condition `Thread.current.runningFiber.resumedOnThread === Thread.current.get()` always true
* *
* [Fiber][Fiber] implementations must control this property explicitly with [enterFiber()][ThreadLocalMethods.enterFiber] * [Fiber][Fiber] implementations must control this property explicitly with [enterFiber()][ThreadLocalMethods.enterFiber]
* and [exitFiber()][ThreadLocalMethods.exitFiber] * and [exitFiber()][ExitFiberCallback.exitFiber]
* *
* @throws WrongCallerThreadException if called on thread different that caller of [Thread.current]. * @throws WrongCallerThreadException if called on thread different that caller of [Thread.current].
*/ */
@ -34,8 +35,40 @@ public expect sealed interface ThreadLocalMethods {
* *
* @throws WrongCallerThreadException if called on thread different that caller of [Thread.current]. * @throws WrongCallerThreadException if called on thread different that caller of [Thread.current].
*/ */
public fun enterFiber(fiberToEnter: Fiber): FiberExiter public fun enterFiber(fiberToEnter: Fiber): ExitFiberCallback
public val runningEventLoop: SingleThreadEventLoop? public val runningEventLoop: SingleThreadEventLoop?
public fun setEventLoop(eventLoop: SingleThreadEventLoop): EventLoopClearer public fun setEventLoop(eventLoop: SingleThreadEventLoop): ClearEventLoopCallback
/**
* Hints platform scheduler to switch to another thread. Doesn't guarantee switching.
*/
public fun yield()
/**
* Removes thread from system scheduler until [resumeThread()][ResumeThreadCallback.resumeThread].
* If system allows unauthorized thread resuming, blocks these attempts.
*
* It's a high-level mechanism to implement custom synchronization methods, it isn't faster than existing native synchronization primitives.
*
* @param store callback to obtain and store [ResumeThreadCallback] reference.
* If [resumeThread()][ResumeThreadCallback.resumeThread] called inside - immediately returns.
*/
public fun suspendThread(store: (ResumeThreadCallback) -> Unit)
/**
* Removes thread from system scheduler until [resumeThread()][ResumeThreadCallback.resumeThread] or until timeout expires.
* If system allows unauthorized thread resuming, blocks these attempts.
*
* When timeout expires, [resumeThread()][ResumeThreadCallback.resumeThread] becomes unavailable
* (wrap it with explicit synchronization) and [timeoutHandler] called in thread that was suspended before this function returns.
*
* It's a high-level mechanism to implement custom synchronization methods, it isn't faster than existing native synchronization primitives.
*
* @param timeoutMillis timeout in milliseconds. If equals to `0`, [store] and [timeoutHandler] will be called anyway.
* @param timeoutHandler callback called by resumed thread after timeout expired.
* @param store callback to obtain and store [ResumeThreadCallback] reference.
* If [resumeThread()][ResumeThreadCallback.resumeThread] called inside - immediately returns.
*/
public fun suspendThread(timeoutMillis: UInt, timeoutHandler: ResumeThreadCallback.TimeoutHandler, store: (ResumeThreadCallback) -> Unit)
} }

View File

@ -0,0 +1,9 @@
package ru.landgrafhomyak.multitasking_0.threads.sync
public interface ResumeThreadCallback {
public fun resumeThread()
public interface TimeoutHandler {
public fun resumedBecauseOfTimeout()
}
}

View File

@ -1,6 +1,5 @@
package ru.landgrafhomyak.multitasking_0.impl.java_virtual_threads package ru.landgrafhomyak.multitasking_0.impl.java_virtual_threads
import java.lang.Object as jObject
import java.lang.Thread as jThread import java.lang.Thread as jThread
import java.util.concurrent.locks.Condition import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
@ -9,7 +8,7 @@ import ru.landgrafhomyak.multitasking_0.fibers.Fiber
import ru.landgrafhomyak.multitasking_0.fibers.FiberRoutine import ru.landgrafhomyak.multitasking_0.fibers.FiberRoutine
import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException
import ru.landgrafhomyak.multitasking_0.fibers.FiberInterruptedException import ru.landgrafhomyak.multitasking_0.fibers.FiberInterruptedException
import ru.landgrafhomyak.multitasking_0.threads.FiberExiter import ru.landgrafhomyak.multitasking_0.threads.ExitFiberCallback
import ru.landgrafhomyak.multitasking_0.threads.Thread as wThread import ru.landgrafhomyak.multitasking_0.threads.Thread as wThread
public class JavaVirtualThreadFiber : Fiber { public class JavaVirtualThreadFiber : Fiber {
@ -20,7 +19,7 @@ public class JavaVirtualThreadFiber : Fiber {
private var _uncaughtException: Throwable? private var _uncaughtException: Throwable?
override val name: String override val name: String
private var _resumedOnThread: wThread? private var _resumedOnThread: wThread?
private var _fiberExiter: FiberExiter? private var _fiberExiter: ExitFiberCallback?
private var _interruptionData: InterruptionData? private var _interruptionData: InterruptionData?
public constructor(name: String, routine: FiberRoutine) { public constructor(name: String, routine: FiberRoutine) {

View File

@ -1,12 +1,14 @@
package ru.landgrafhomyak.multitasking_0.threads package ru.landgrafhomyak.multitasking_0.threads
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.LockSupport
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock import kotlin.concurrent.withLock
import java.lang.Thread as jThread import java.lang.Thread as jThread
import ru.landgrafhomyak.multitasking_0.fibers.Fiber import ru.landgrafhomyak.multitasking_0.fibers.Fiber
import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop
import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException import ru.landgrafhomyak.multitasking_0.WrongCallerThreadException
import ru.landgrafhomyak.multitasking_0.threads.sync.ResumeThreadCallback
@Suppress("AMBIGUOUS_EXPECTS", "ACTUAL_WITHOUT_EXPECT") @Suppress("AMBIGUOUS_EXPECTS", "ACTUAL_WITHOUT_EXPECT")
public actual sealed class Thread { public actual sealed class Thread {
@ -165,7 +167,7 @@ public actual sealed class Thread {
val caller: FibersStackNode?, val caller: FibersStackNode?,
@JvmField @JvmField
val fiber: Fiber, val fiber: Fiber,
) : FiberExiter { ) : ExitFiberCallback {
private val _isClosed = AtomicBoolean(false) private val _isClosed = AtomicBoolean(false)
override val callerFiber: Fiber? override val callerFiber: Fiber?
get() { get() {
@ -194,7 +196,7 @@ public actual sealed class Thread {
return this._fibersStack?.fiber return this._fibersStack?.fiber
} }
override fun enterFiber(fiberToEnter: Fiber): FiberExiter { override fun enterFiber(fiberToEnter: Fiber): ExitFiberCallback {
this._assertThread() this._assertThread()
val node = this.FibersStackNode(this._fibersStack, fiberToEnter) val node = this.FibersStackNode(this._fibersStack, fiberToEnter)
this._fibersStack = node this._fibersStack = node
@ -210,9 +212,9 @@ public actual sealed class Thread {
} }
private inner class EventLoopClearerImpl( private inner class ClearEventLoopCallbackImpl(
) : EventLoopClearer { ) : ClearEventLoopCallback {
private val _isClosed = AtomicBoolean(false) private val _isClosed = AtomicBoolean(false)
override fun clearEventLoop() { override fun clearEventLoop() {
@ -223,12 +225,100 @@ public actual sealed class Thread {
} }
} }
override fun setEventLoop(eventLoop: SingleThreadEventLoop): EventLoopClearer { override fun setEventLoop(eventLoop: SingleThreadEventLoop): ClearEventLoopCallback {
this._assertThread() this._assertThread()
if (this._runningEventLoop != null) if (this._runningEventLoop != null)
throw IllegalStateException("There is already a running event loop here") throw IllegalStateException("There is already a running event loop here")
this._runningEventLoop = eventLoop this._runningEventLoop = eventLoop
return this.EventLoopClearerImpl() return this.ClearEventLoopCallbackImpl()
}
override fun yield() {
this._assertThread()
jThread.yield()
}
private inner class ResumeThreadCallbackImpl : ResumeThreadCallback {
val sync = AtomicBoolean(false)
var isCalled = false
override fun resumeThread() {
while (!this.sync.compareAndSet(false, true))
jThread.onSpinWait()
if (this.isCalled) {
this.sync.set(false)
throw IllegalStateException("Thread already resumed by this callback instance")
}
when (this@Thread._nativeThread.state) {
jThread.State.WAITING, jThread.State.TIMED_WAITING -> {
LockSupport.unpark(this@Thread._nativeThread)
}
else -> {
// catched after external unpark but before it enters lock or before it enters park at all
}
}
this.isCalled = true
this.sync.set(false)
}
}
override fun suspendThread(store: (ResumeThreadCallback) -> Unit) {
this._assertThread()
val resumer = this.ResumeThreadCallbackImpl()
store(resumer)
run {
while (!resumer.sync.compareAndSet(false, true))
jThread.onSpinWait()
val isCalled = resumer.isCalled
resumer.sync.set(false)
if (isCalled)
return
}
while (true) {
LockSupport.park()
while (!resumer.sync.compareAndSet(false, true))
jThread.onSpinWait()
val isCalled = resumer.isCalled
resumer.sync.set(false)
if (isCalled)
break
}
}
override fun suspendThread(timeoutMillis: UInt, timeoutHandler: ResumeThreadCallback.TimeoutHandler, store: (ResumeThreadCallback) -> Unit) {
this._assertThread()
val resumer = this.ResumeThreadCallbackImpl()
store(resumer)
run {
while (!resumer.sync.compareAndSet(false, true))
jThread.onSpinWait()
val isCalled = resumer.isCalled
resumer.sync.set(false)
if (isCalled)
return
}
val endTime = System.nanoTime() + (timeoutMillis.toULong() * 1_000_000uL).toLong()
while (true) {
LockSupport.parkNanos(endTime - System.nanoTime())
while (!resumer.sync.compareAndSet(false, true))
jThread.onSpinWait()
if (resumer.isCalled) {
resumer.sync.set(false)
break
}
if (System.nanoTime() >= endTime) {
resumer.isCalled = true
resumer.sync.set(false)
timeoutHandler.resumedBecauseOfTimeout()
}
resumer.sync.set(false)
}
} }
} }
} }

View File

@ -2,13 +2,19 @@ package ru.landgrafhomyak.multitasking_0.threads
import ru.landgrafhomyak.multitasking_0.fibers.Fiber import ru.landgrafhomyak.multitasking_0.fibers.Fiber
import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop import ru.landgrafhomyak.multitasking_0.SingleThreadEventLoop
import ru.landgrafhomyak.multitasking_0.threads.sync.ResumeThreadCallback
public actual sealed interface ThreadLocalMethods { public actual sealed interface ThreadLocalMethods {
public actual fun get(): Thread public actual fun get(): Thread
public actual val runningFiber: Fiber? public actual val runningFiber: Fiber?
public actual fun enterFiber(fiberToEnter: Fiber): FiberExiter public actual fun enterFiber(fiberToEnter: Fiber): ExitFiberCallback
public actual val runningEventLoop: SingleThreadEventLoop? public actual val runningEventLoop: SingleThreadEventLoop?
public actual fun setEventLoop(eventLoop: SingleThreadEventLoop): EventLoopClearer public actual fun setEventLoop(eventLoop: SingleThreadEventLoop): ClearEventLoopCallback
public actual fun yield()
public actual fun suspendThread(store: (ResumeThreadCallback) -> Unit)
public actual fun suspendThread(timeoutMillis: UInt, timeoutHandler: ResumeThreadCallback.TimeoutHandler, store: (ResumeThreadCallback) -> Unit)
} }