Support of externally created threads

This commit is contained in:
Andrew Golovashevich 2025-09-08 02:26:57 +03:00
parent cfcf761745
commit 8fab4c9e57
4 changed files with 268 additions and 94 deletions

View File

@ -11,15 +11,9 @@ internal class DrivenPlatformThread : Thread {
override val _nativeThread: jThread override val _nativeThread: jThread
private val _sync: ReentrantLock
private var _uncaughtException: Throwable?
private var _isExplicitlyDestroyed: Boolean
private val _threadFinishedWaiter: CountDownLatch private val _threadFinishedWaiter: CountDownLatch
internal constructor(name: String, isDaemon: Boolean, routine: ThreadRoutine) : super() { internal constructor(name: String, isDaemon: Boolean, routine: ThreadRoutine) : super() {
this._sync = ReentrantLock()
this._uncaughtException = null
this._isExplicitlyDestroyed = false
this._threadFinishedWaiter = CountDownLatch(1) this._threadFinishedWaiter = CountDownLatch(1)
this._nativeThread = jThread.ofPlatform().name(name).daemon(isDaemon).unstarted(this.Kernel(routine)) this._nativeThread = jThread.ofPlatform().name(name).daemon(isDaemon).unstarted(this.Kernel(routine))
@ -28,6 +22,7 @@ internal class DrivenPlatformThread : Thread {
private inner class Kernel(private val _routine: ThreadRoutine) : Runnable { private inner class Kernel(private val _routine: ThreadRoutine) : Runnable {
override fun run() { override fun run() {
try { try {
Thread._tl_currentThread.set(this@DrivenPlatformThread)
this._routine.runThread(this@DrivenPlatformThread) this._routine.runThread(this@DrivenPlatformThread)
} catch (t: Throwable) { } catch (t: Throwable) {
this@DrivenPlatformThread._sync.withLock { this@DrivenPlatformThread._sync.withLock {
@ -39,84 +34,13 @@ internal class DrivenPlatformThread : Thread {
} }
} }
override val state: State
get() {
this._sync.withLock {
if (this._isExplicitlyDestroyed)
return State.DESTROYED
when (this._nativeThread.state) {
jThread.State.NEW -> return State.NEW
jThread.State.RUNNABLE -> return State.NOT_FINISHED
jThread.State.BLOCKED -> return State.NOT_FINISHED
jThread.State.WAITING -> return State.NOT_FINISHED
jThread.State.TIMED_WAITING -> return State.NOT_FINISHED
jThread.State.TERMINATED -> {
if (this._uncaughtException == null)
return State.FINISHED_SUCCESSFULLY
else
return State.FINISHED_WITH_ERROR
}
}
}
}
override val uncaughtException: Throwable
get() {
this._sync.withLock {
val e = this._uncaughtException
if (e != null)
return e
when (this._nativeThread.state) {
jThread.State.NEW, jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING ->
throw IllegalStateException("Thread not finished yet")
jThread.State.TERMINATED -> throw IllegalStateException("Thread finished without uncaught exceptions")
}
}
}
override fun start() {
this._sync.withLock {
if (this._isExplicitlyDestroyed)
throw IllegalStateException("Thread already was started, finished and destroyed")
// stdlib's errors aren't verbose
when (this._nativeThread.state) {
jThread.State.NEW -> {}
jThread.State.TERMINATED -> throw IllegalStateException("Thread already was started and finished")
jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING ->
throw IllegalStateException("Thread already was started")
}
this._nativeThread.start()
}
}
override fun join() { override fun join() {
this._sync.withLock { this._sync.withLock {
if (this._isExplicitlyDestroyed) this._assertJoinAllowed()
throw IllegalStateException("Can't join on destroyed thread")
if (this._nativeThread.state == jThread.State.NEW)
throw IllegalStateException("Can't join on thread that isn't started")
} }
// stdlib's join uses Object.wait which may block caller virtual thread, so let wait in way virtual threads support // stdlib's join uses Object.wait which may block caller virtual thread, so let wait in way virtual threads support
this._threadFinishedWaiter.await() this._threadFinishedWaiter.await()
this._nativeThread.join() this._nativeThread.join()
} }
override fun releaseResources() {
this._sync.withLock {
if (this._isExplicitlyDestroyed)
throw IllegalStateException("Thread already destroyed")
when (this._nativeThread.state) {
jThread.State.NEW -> {}
jThread.State.TERMINATED -> {}
jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING ->
throw IllegalStateException("Thread already was started")
}
this._isExplicitlyDestroyed = true
}
}
} }

View File

@ -0,0 +1,34 @@
package ru.landgrafhomyak.multitasking_0.threads
import java.lang.Thread as jThread
import kotlin.concurrent.withLock
internal class ExternalThread : Thread {
override fun toString(): String = TODO()
override val _nativeThread: jThread
internal constructor(nativeThread: jThread) : super() {
this._nativeThread = nativeThread
nativeThread.uncaughtExceptionHandler = this.UncaughtExceptionHandlerImpl(nativeThread.uncaughtExceptionHandler)
if (nativeThread.state == jThread.State.TERMINATED)
this._isExplicitlyDestroyed = true
}
private inner class UncaughtExceptionHandlerImpl(
private val _next: jThread.UncaughtExceptionHandler?,
) : jThread.UncaughtExceptionHandler {
override fun uncaughtException(t: jThread?, e: Throwable?) {
this@ExternalThread._uncaughtException = e
this._next?.uncaughtException(t, e)
}
}
override fun join() {
this._sync.withLock {
this._assertJoinAllowed()
}
this._nativeThread.join()
}
}

View File

@ -1,25 +1,108 @@
package ru.landgrafhomyak.multitasking_0.threads package ru.landgrafhomyak.multitasking_0.threads
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import java.lang.Thread as jThread import java.lang.Thread as jThread
import ru.landgrafhomyak.multitasking_0.Fiber import ru.landgrafhomyak.multitasking_0.Fiber
@Suppress("AMBIGUOUS_EXPECTS", "ACTUAL_WITHOUT_EXPECT") @Suppress("AMBIGUOUS_EXPECTS", "ACTUAL_WITHOUT_EXPECT")
public actual sealed class Thread { public actual sealed class Thread {
actual abstract override fun toString(): String
public actual var runningFiber: Fiber? = null public actual var runningFiber: Fiber? = null
protected abstract val _nativeThread: jThread protected abstract val _nativeThread: jThread
protected val _sync: ReentrantLock = ReentrantLock()
protected var _isExplicitlyDestroyed: Boolean = false
protected var _uncaughtException: Throwable? = null
public actual val name: String get() = this._nativeThread.name actual abstract override fun toString(): String
public actual val isDaemon: Boolean get() = this._nativeThread.isDaemon
public actual abstract val state: State
public actual abstract val uncaughtException: Throwable
public actual abstract fun releaseResources()
public actual abstract fun start() public actual val name: String
get() = this._nativeThread.name
public actual val isDaemon: Boolean
get() = this._nativeThread.isDaemon
public actual val state: State
get() {
this._sync.withLock {
if (this._isExplicitlyDestroyed)
return State.DESTROYED
when (this._nativeThread.state) {
jThread.State.NEW -> return State.NEW
jThread.State.RUNNABLE -> return State.NOT_FINISHED
jThread.State.BLOCKED -> return State.NOT_FINISHED
jThread.State.WAITING -> return State.NOT_FINISHED
jThread.State.TIMED_WAITING -> return State.NOT_FINISHED
jThread.State.TERMINATED -> {
if (this._uncaughtException == null)
return State.FINISHED_SUCCESSFULLY
else
return State.FINISHED_WITH_ERROR
}
}
}
}
public actual val uncaughtException: Throwable
get() {
this._sync.withLock {
val e = this._uncaughtException
if (e != null)
return e
when (this._nativeThread.state) {
jThread.State.NEW, jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING ->
throw IllegalStateException("Thread not finished yet")
jThread.State.TERMINATED -> throw IllegalStateException("Thread finished without uncaught exceptions")
}
}
}
public actual fun start() {
this._sync.withLock {
if (this._isExplicitlyDestroyed)
throw IllegalStateException("Thread already was started, finished and destroyed")
// stdlib's errors aren't verbose
when (this._nativeThread.state) {
jThread.State.NEW -> {}
jThread.State.TERMINATED -> throw IllegalStateException("Thread already was started and finished")
jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING ->
throw IllegalStateException("Thread already was started")
}
this._nativeThread.start()
}
}
/**
* Requires explicit synchronization
*/
protected fun _assertJoinAllowed() {
if (this._isExplicitlyDestroyed)
throw IllegalStateException("Can't join on destroyed thread")
if (this._nativeThread.state == jThread.State.NEW)
throw IllegalStateException("Can't join on thread that isn't started")
}
public actual abstract fun join() public actual abstract fun join()
public actual fun releaseResources() {
this._sync.withLock {
if (this._isExplicitlyDestroyed)
throw IllegalStateException("Thread already destroyed")
when (this._nativeThread.state) {
jThread.State.NEW -> {}
jThread.State.TERMINATED -> {}
jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING ->
throw IllegalStateException("Thread already was started")
}
this._isExplicitlyDestroyed = true
}
}
public actual enum class State { public actual enum class State {
NEW, NEW,
NOT_FINISHED, NOT_FINISHED,
@ -28,15 +111,22 @@ public actual sealed class Thread {
DESTROYED DESTROYED
} }
public actual companion object { private object CurrentThreadVariable : ThreadLocal<Thread?>() {
@JvmStatic override fun initialValue(): Thread? {
public actual fun create(name: String, isDaemon: Boolean, routine: ThreadRoutine): Thread { return ExternalThread(jThread.currentThread())
TODO()
}
@JvmStatic
public actual fun currentThread(): Thread {
TODO()
} }
} }
public actual companion object {
@JvmStatic
internal val _tl_currentThread: ThreadLocal<Thread?> get() = CurrentThreadVariable
@JvmStatic
public actual fun create(name: String, isDaemon: Boolean, routine: ThreadRoutine): Thread =
DrivenPlatformThread(name, isDaemon, routine)
@JvmStatic
public actual fun currentThread(): Thread =
this._tl_currentThread.get() ?: throw RuntimeException("This java thread can't be wrapped")
}
} }

View File

@ -0,0 +1,126 @@
package ru.landgrafhomyak.multitasking_0.threads.impl.java_virtual_threads
import java.lang.Thread as jThread
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import ru.landgrafhomyak.multitasking_0.ExecutionState
import ru.landgrafhomyak.multitasking_0.Fiber
import ru.landgrafhomyak.multitasking_0.FiberRoutine
import ru.landgrafhomyak.multitasking_0.threads.Thread as wThread
public class JavaVirtualThreadFiber : Fiber {
private val _thisVThread: jThread
private val _syncLock: ReentrantLock
private val _syncCond: Condition
private var _state: ExecutionState
private var _uncaughtException: Throwable?
override val name: String
private var _resumedOnThread: wThread?
private var _resumedOnFiber: Fiber?
public constructor(name: String, routine: FiberRoutine) {
this.name = name
this._syncLock = ReentrantLock()
this._syncCond = this._syncLock.newCondition()
this._state = ExecutionState.CREATED
this._uncaughtException = null
this._resumedOnThread = null
this._resumedOnFiber = null
this._syncLock.withLock {
this._thisVThread = jThread.ofVirtual().name("fiber: $name").start(this.Kernel(routine))
this._syncCond.await()
}
}
private inner class Kernel(private val _routine: FiberRoutine) : Runnable {
override fun run() {
try {
this@JavaVirtualThreadFiber._syncLock.withLock {
this@JavaVirtualThreadFiber._syncCond.signal()
this@JavaVirtualThreadFiber._syncCond.await()
wThread._tl_currentThread.set(this@JavaVirtualThreadFiber._resumedOnThread!!)
}
this._routine.runFiber(this@JavaVirtualThreadFiber)
this@JavaVirtualThreadFiber._syncLock.withLock {
this@JavaVirtualThreadFiber._state = ExecutionState.FINISHED_SUCCESSFULLY
this@JavaVirtualThreadFiber._syncCond.signal()
}
} catch (t: Throwable) {
this@JavaVirtualThreadFiber._syncLock.withLock {
this@JavaVirtualThreadFiber._uncaughtException = t
this@JavaVirtualThreadFiber._state = ExecutionState.FINISHED_WITH_ERROR
this@JavaVirtualThreadFiber._syncCond.signal()
}
}
}
}
override val ownerThread: ru.landgrafhomyak.multitasking_0.threads.Thread? get() = null
override val state: ExecutionState get() = this._syncLock.withLock { this._state }
override val resumedOnThread: ru.landgrafhomyak.multitasking_0.threads.Thread? get() = this._syncLock.withLock { this._resumedOnThread }
override val resumedOnFiber: Fiber? get() = this._syncLock.withLock { this._resumedOnFiber }
override fun yield() {
if (jThread.currentThread() !== this._thisVThread)
TODO("err")
this._syncLock.withLock {
when (this._state) {
ExecutionState.RUNNING -> this._state = ExecutionState.MANUALLY_SUSPENDED
ExecutionState.CREATED, ExecutionState.MANUALLY_SUSPENDED ->
throw IllegalStateException(".yield() called inside fiber while it is suspended")
ExecutionState.FINISHED_SUCCESSFULLY, ExecutionState.FINISHED_WITH_ERROR ->
throw IllegalStateException(".yield() called inside fiber after it's marked as finished")
}
wThread._tl_currentThread.set(null)
this._syncCond.signal()
this._syncCond.await()
wThread._tl_currentThread.set(this._resumedOnThread!!)
}
}
override fun resume() {
this._syncLock.withLock {
when (this._state) {
ExecutionState.CREATED, ExecutionState.MANUALLY_SUSPENDED ->
this._state = ExecutionState.RUNNING
ExecutionState.RUNNING -> {
if (jThread.currentThread() == this._thisVThread) {
throw IllegalStateException("Recursive fiber execution")
} else {
throw IllegalStateException("Fiber already running")
}
}
ExecutionState.FINISHED_SUCCESSFULLY, ExecutionState.FINISHED_WITH_ERROR -> throw IllegalStateException("Fiber already finished")
}
val currentThread = wThread.currentThread()
this._resumedOnThread = currentThread
this._resumedOnFiber = currentThread.runningFiber
currentThread.runningFiber = this
this._syncCond.signal()
this._syncCond.await()
currentThread.runningFiber = this._resumedOnFiber
this._resumedOnThread = null
this._resumedOnFiber = null
}
}
override val uncaughtException: Throwable
get() = this._uncaughtException ?: when (this._syncLock.withLock { this._state }) {
ExecutionState.CREATED, ExecutionState.RUNNING, ExecutionState.MANUALLY_SUSPENDED ->
throw IllegalStateException("Fiber not finished yet")
ExecutionState.FINISHED_SUCCESSFULLY -> throw IllegalStateException("Fiber finished without exception")
ExecutionState.FINISHED_WITH_ERROR -> throw IllegalStateException("Finished with exception but it wasn't saved")
}
}