diff --git a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/DrivenPlatformThread.kt b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/DrivenPlatformThread.kt index 42d98bb..f047e4a 100644 --- a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/DrivenPlatformThread.kt +++ b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/DrivenPlatformThread.kt @@ -11,15 +11,9 @@ internal class DrivenPlatformThread : Thread { override val _nativeThread: jThread - private val _sync: ReentrantLock - private var _uncaughtException: Throwable? - private var _isExplicitlyDestroyed: Boolean private val _threadFinishedWaiter: CountDownLatch internal constructor(name: String, isDaemon: Boolean, routine: ThreadRoutine) : super() { - this._sync = ReentrantLock() - this._uncaughtException = null - this._isExplicitlyDestroyed = false this._threadFinishedWaiter = CountDownLatch(1) this._nativeThread = jThread.ofPlatform().name(name).daemon(isDaemon).unstarted(this.Kernel(routine)) @@ -28,6 +22,7 @@ internal class DrivenPlatformThread : Thread { private inner class Kernel(private val _routine: ThreadRoutine) : Runnable { override fun run() { try { + Thread._tl_currentThread.set(this@DrivenPlatformThread) this._routine.runThread(this@DrivenPlatformThread) } catch (t: Throwable) { this@DrivenPlatformThread._sync.withLock { @@ -39,84 +34,13 @@ internal class DrivenPlatformThread : Thread { } } - override val state: State - get() { - this._sync.withLock { - if (this._isExplicitlyDestroyed) - return State.DESTROYED - - when (this._nativeThread.state) { - jThread.State.NEW -> return State.NEW - jThread.State.RUNNABLE -> return State.NOT_FINISHED - jThread.State.BLOCKED -> return State.NOT_FINISHED - jThread.State.WAITING -> return State.NOT_FINISHED - jThread.State.TIMED_WAITING -> return State.NOT_FINISHED - jThread.State.TERMINATED -> { - if (this._uncaughtException == null) - return State.FINISHED_SUCCESSFULLY - else - return State.FINISHED_WITH_ERROR - } - } - } - } - - override val uncaughtException: Throwable - get() { - this._sync.withLock { - val e = this._uncaughtException - if (e != null) - return e - - when (this._nativeThread.state) { - jThread.State.NEW, jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING -> - throw IllegalStateException("Thread not finished yet") - - jThread.State.TERMINATED -> throw IllegalStateException("Thread finished without uncaught exceptions") - } - } - } - - override fun start() { - this._sync.withLock { - if (this._isExplicitlyDestroyed) - throw IllegalStateException("Thread already was started, finished and destroyed") - // stdlib's errors aren't verbose - when (this._nativeThread.state) { - jThread.State.NEW -> {} - jThread.State.TERMINATED -> throw IllegalStateException("Thread already was started and finished") - jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING -> - throw IllegalStateException("Thread already was started") - } - - this._nativeThread.start() - } - } - override fun join() { this._sync.withLock { - if (this._isExplicitlyDestroyed) - throw IllegalStateException("Can't join on destroyed thread") - if (this._nativeThread.state == jThread.State.NEW) - throw IllegalStateException("Can't join on thread that isn't started") + this._assertJoinAllowed() } // stdlib's join uses Object.wait which may block caller virtual thread, so let wait in way virtual threads support this._threadFinishedWaiter.await() this._nativeThread.join() } - - override fun releaseResources() { - this._sync.withLock { - if (this._isExplicitlyDestroyed) - throw IllegalStateException("Thread already destroyed") - when (this._nativeThread.state) { - jThread.State.NEW -> {} - jThread.State.TERMINATED -> {} - jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING -> - throw IllegalStateException("Thread already was started") - } - this._isExplicitlyDestroyed = true - } - } } \ No newline at end of file diff --git a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ExternalThread.kt b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ExternalThread.kt new file mode 100644 index 0000000..9427255 --- /dev/null +++ b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/ExternalThread.kt @@ -0,0 +1,34 @@ +package ru.landgrafhomyak.multitasking_0.threads + +import java.lang.Thread as jThread +import kotlin.concurrent.withLock + +internal class ExternalThread : Thread { + override fun toString(): String = TODO() + + override val _nativeThread: jThread + + internal constructor(nativeThread: jThread) : super() { + this._nativeThread = nativeThread + nativeThread.uncaughtExceptionHandler = this.UncaughtExceptionHandlerImpl(nativeThread.uncaughtExceptionHandler) + if (nativeThread.state == jThread.State.TERMINATED) + this._isExplicitlyDestroyed = true + } + + private inner class UncaughtExceptionHandlerImpl( + private val _next: jThread.UncaughtExceptionHandler?, + ) : jThread.UncaughtExceptionHandler { + override fun uncaughtException(t: jThread?, e: Throwable?) { + this@ExternalThread._uncaughtException = e + this._next?.uncaughtException(t, e) + } + } + + override fun join() { + this._sync.withLock { + this._assertJoinAllowed() + } + + this._nativeThread.join() + } +} \ No newline at end of file diff --git a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt index 686b2a1..a8e624b 100644 --- a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt +++ b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/Thread.kt @@ -1,25 +1,108 @@ package ru.landgrafhomyak.multitasking_0.threads +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock import java.lang.Thread as jThread import ru.landgrafhomyak.multitasking_0.Fiber @Suppress("AMBIGUOUS_EXPECTS", "ACTUAL_WITHOUT_EXPECT") public actual sealed class Thread { - actual abstract override fun toString(): String public actual var runningFiber: Fiber? = null protected abstract val _nativeThread: jThread + protected val _sync: ReentrantLock = ReentrantLock() + protected var _isExplicitlyDestroyed: Boolean = false + protected var _uncaughtException: Throwable? = null - public actual val name: String get() = this._nativeThread.name - public actual val isDaemon: Boolean get() = this._nativeThread.isDaemon - public actual abstract val state: State - public actual abstract val uncaughtException: Throwable - public actual abstract fun releaseResources() + actual abstract override fun toString(): String - public actual abstract fun start() + public actual val name: String + get() = this._nativeThread.name + + public actual val isDaemon: Boolean + get() = this._nativeThread.isDaemon + + public actual val state: State + get() { + this._sync.withLock { + if (this._isExplicitlyDestroyed) + return State.DESTROYED + + when (this._nativeThread.state) { + jThread.State.NEW -> return State.NEW + jThread.State.RUNNABLE -> return State.NOT_FINISHED + jThread.State.BLOCKED -> return State.NOT_FINISHED + jThread.State.WAITING -> return State.NOT_FINISHED + jThread.State.TIMED_WAITING -> return State.NOT_FINISHED + jThread.State.TERMINATED -> { + if (this._uncaughtException == null) + return State.FINISHED_SUCCESSFULLY + else + return State.FINISHED_WITH_ERROR + } + } + } + } + + public actual val uncaughtException: Throwable + get() { + this._sync.withLock { + val e = this._uncaughtException + if (e != null) + return e + + when (this._nativeThread.state) { + jThread.State.NEW, jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING -> + throw IllegalStateException("Thread not finished yet") + + jThread.State.TERMINATED -> throw IllegalStateException("Thread finished without uncaught exceptions") + } + } + } + + + public actual fun start() { + this._sync.withLock { + if (this._isExplicitlyDestroyed) + throw IllegalStateException("Thread already was started, finished and destroyed") + // stdlib's errors aren't verbose + when (this._nativeThread.state) { + jThread.State.NEW -> {} + jThread.State.TERMINATED -> throw IllegalStateException("Thread already was started and finished") + jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING -> + throw IllegalStateException("Thread already was started") + } + + this._nativeThread.start() + } + } + + /** + * Requires explicit synchronization + */ + protected fun _assertJoinAllowed() { + if (this._isExplicitlyDestroyed) + throw IllegalStateException("Can't join on destroyed thread") + if (this._nativeThread.state == jThread.State.NEW) + throw IllegalStateException("Can't join on thread that isn't started") + } public actual abstract fun join() + public actual fun releaseResources() { + this._sync.withLock { + if (this._isExplicitlyDestroyed) + throw IllegalStateException("Thread already destroyed") + when (this._nativeThread.state) { + jThread.State.NEW -> {} + jThread.State.TERMINATED -> {} + jThread.State.RUNNABLE, jThread.State.BLOCKED, jThread.State.WAITING, jThread.State.TIMED_WAITING -> + throw IllegalStateException("Thread already was started") + } + this._isExplicitlyDestroyed = true + } + } + public actual enum class State { NEW, NOT_FINISHED, @@ -28,15 +111,22 @@ public actual sealed class Thread { DESTROYED } - public actual companion object { - @JvmStatic - public actual fun create(name: String, isDaemon: Boolean, routine: ThreadRoutine): Thread { - TODO() - } - - @JvmStatic - public actual fun currentThread(): Thread { - TODO() + private object CurrentThreadVariable : ThreadLocal() { + override fun initialValue(): Thread? { + return ExternalThread(jThread.currentThread()) } } + + public actual companion object { + @JvmStatic + internal val _tl_currentThread: ThreadLocal get() = CurrentThreadVariable + + @JvmStatic + public actual fun create(name: String, isDaemon: Boolean, routine: ThreadRoutine): Thread = + DrivenPlatformThread(name, isDaemon, routine) + + @JvmStatic + public actual fun currentThread(): Thread = + this._tl_currentThread.get() ?: throw RuntimeException("This java thread can't be wrapped") + } } \ No newline at end of file diff --git a/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/impl/java_virtual_threads/JavaVirtualThreadFiber.kt b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/impl/java_virtual_threads/JavaVirtualThreadFiber.kt new file mode 100644 index 0000000..b99dfb1 --- /dev/null +++ b/src/jvmMain/kotlin/ru/landgrafhomyak/multitasking_0/threads/impl/java_virtual_threads/JavaVirtualThreadFiber.kt @@ -0,0 +1,126 @@ +package ru.landgrafhomyak.multitasking_0.threads.impl.java_virtual_threads + +import java.lang.Thread as jThread +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.withLock +import ru.landgrafhomyak.multitasking_0.ExecutionState +import ru.landgrafhomyak.multitasking_0.Fiber +import ru.landgrafhomyak.multitasking_0.FiberRoutine +import ru.landgrafhomyak.multitasking_0.threads.Thread as wThread + +public class JavaVirtualThreadFiber : Fiber { + private val _thisVThread: jThread + private val _syncLock: ReentrantLock + private val _syncCond: Condition + private var _state: ExecutionState + private var _uncaughtException: Throwable? + override val name: String + private var _resumedOnThread: wThread? + private var _resumedOnFiber: Fiber? + + public constructor(name: String, routine: FiberRoutine) { + this.name = name + this._syncLock = ReentrantLock() + this._syncCond = this._syncLock.newCondition() + this._state = ExecutionState.CREATED + this._uncaughtException = null + this._resumedOnThread = null + this._resumedOnFiber = null + + this._syncLock.withLock { + this._thisVThread = jThread.ofVirtual().name("fiber: $name").start(this.Kernel(routine)) + this._syncCond.await() + } + } + + private inner class Kernel(private val _routine: FiberRoutine) : Runnable { + override fun run() { + try { + this@JavaVirtualThreadFiber._syncLock.withLock { + this@JavaVirtualThreadFiber._syncCond.signal() + this@JavaVirtualThreadFiber._syncCond.await() + wThread._tl_currentThread.set(this@JavaVirtualThreadFiber._resumedOnThread!!) + } + this._routine.runFiber(this@JavaVirtualThreadFiber) + this@JavaVirtualThreadFiber._syncLock.withLock { + this@JavaVirtualThreadFiber._state = ExecutionState.FINISHED_SUCCESSFULLY + this@JavaVirtualThreadFiber._syncCond.signal() + } + } catch (t: Throwable) { + this@JavaVirtualThreadFiber._syncLock.withLock { + this@JavaVirtualThreadFiber._uncaughtException = t + this@JavaVirtualThreadFiber._state = ExecutionState.FINISHED_WITH_ERROR + this@JavaVirtualThreadFiber._syncCond.signal() + } + } + } + } + + override val ownerThread: ru.landgrafhomyak.multitasking_0.threads.Thread? get() = null + override val state: ExecutionState get() = this._syncLock.withLock { this._state } + override val resumedOnThread: ru.landgrafhomyak.multitasking_0.threads.Thread? get() = this._syncLock.withLock { this._resumedOnThread } + override val resumedOnFiber: Fiber? get() = this._syncLock.withLock { this._resumedOnFiber } + + override fun yield() { + if (jThread.currentThread() !== this._thisVThread) + TODO("err") + + this._syncLock.withLock { + when (this._state) { + ExecutionState.RUNNING -> this._state = ExecutionState.MANUALLY_SUSPENDED + ExecutionState.CREATED, ExecutionState.MANUALLY_SUSPENDED -> + throw IllegalStateException(".yield() called inside fiber while it is suspended") + + ExecutionState.FINISHED_SUCCESSFULLY, ExecutionState.FINISHED_WITH_ERROR -> + throw IllegalStateException(".yield() called inside fiber after it's marked as finished") + } + + + wThread._tl_currentThread.set(null) + this._syncCond.signal() + this._syncCond.await() + wThread._tl_currentThread.set(this._resumedOnThread!!) + } + } + + override fun resume() { + this._syncLock.withLock { + when (this._state) { + ExecutionState.CREATED, ExecutionState.MANUALLY_SUSPENDED -> + this._state = ExecutionState.RUNNING + + ExecutionState.RUNNING -> { + if (jThread.currentThread() == this._thisVThread) { + throw IllegalStateException("Recursive fiber execution") + } else { + throw IllegalStateException("Fiber already running") + } + } + + ExecutionState.FINISHED_SUCCESSFULLY, ExecutionState.FINISHED_WITH_ERROR -> throw IllegalStateException("Fiber already finished") + } + + val currentThread = wThread.currentThread() + this._resumedOnThread = currentThread + this._resumedOnFiber = currentThread.runningFiber + currentThread.runningFiber = this + + this._syncCond.signal() + this._syncCond.await() + + currentThread.runningFiber = this._resumedOnFiber + this._resumedOnThread = null + this._resumedOnFiber = null + } + } + + override val uncaughtException: Throwable + get() = this._uncaughtException ?: when (this._syncLock.withLock { this._state }) { + ExecutionState.CREATED, ExecutionState.RUNNING, ExecutionState.MANUALLY_SUSPENDED -> + throw IllegalStateException("Fiber not finished yet") + + ExecutionState.FINISHED_SUCCESSFULLY -> throw IllegalStateException("Fiber finished without exception") + ExecutionState.FINISHED_WITH_ERROR -> throw IllegalStateException("Finished with exception but it wasn't saved") + } +} \ No newline at end of file