diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt index f0c7041..6fc6d3f 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt @@ -1,10 +1,19 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading -class CountDownLatch(initialCounterValue: ULong) : AutoCloseable { - private val _refcnt = _Refcnt("Latch was destroyed") - private var _counter = initialCounterValue - private val _mutex = Mutex() - private val _condition = Condition() +class CountDownLatch : AutoCloseable { + private val _refcnt: _Refcnt + private var _counter: ULong + private val _mutex: Mutex + private val _condition: Condition + + constructor(initialCounterValue: ULong) { + this._refcnt = _Refcnt("Latch was destroyed") + this._counter = initialCounterValue + this._mutex = Mutex() + _safeAutoClose2(onAbort = this._mutex::close) { + this._condition = Condition() + } + } fun await() = this._refcnt.withRef { this._mutex.withLock { @@ -24,7 +33,9 @@ class CountDownLatch(initialCounterValue: ULong) : AutoCloseable { override fun close() { this._refcnt.close("Latch is still in use") - this._condition.close() - this._mutex.close() + _safeAutoClose1( + action = this._condition::close, + finally = this._mutex::close + ) } } \ No newline at end of file diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt new file mode 100644 index 0000000..2d92dde --- /dev/null +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt @@ -0,0 +1,13 @@ +package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading + +expect class Thread : AutoCloseable { + fun interface Routine { + fun run() + } + + constructor(routine: Routine) + + fun start() + + fun join(): Throwable? +} \ No newline at end of file diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt index 234859e..b96c35c 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt @@ -31,6 +31,10 @@ internal class _Refcnt(private val _errMessage: String) { this._value.update(Long::dec) } + inline fun tryDecref(block: () -> R): R { + this.checkNotClosed() + return _safeAutoClose2(onSuccess = this::decref, action = block) + } fun close(errExistRefs: String) { val state = this._value.compareAndExchange(0, -1) when { diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt index d56d00f..1687d18 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt @@ -6,6 +6,7 @@ import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind import kotlin.contracts.contract import kotlinx.atomicfu.AtomicLong +import kotlinx.atomicfu.AtomicRef internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long { while (true) { @@ -15,6 +16,14 @@ internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long } } +internal fun AtomicRef.compareAndExchange(expected: T, newValue: T): T { + while (true) { + val old = this.value + if (old === expected) return old + if (this.compareAndSet(old, newValue)) return old + } +} + @PublishedApi internal inline fun _safeAutoClose1( finally: () -> Unit, @@ -43,6 +52,21 @@ internal inline fun _safeAutoClose2( return _safeAutoClose3(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) } +@PublishedApi +internal inline fun _safeAutoClose2e( + onAbort: (Throwable) -> Unit = {}, + onSuccess: () -> Unit = {}, + action: () -> R +): R { + @Suppress("WRONG_INVOCATION_KIND") + contract { + callsInPlace(action, InvocationKind.EXACTLY_ONCE) + callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE) + callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE) + } + return _safeAutoClose3e(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) +} + @PublishedApi internal inline fun _safeAutoClose3( onAbort: () -> Unit = {}, @@ -56,6 +80,22 @@ internal inline fun _safeAutoClose3( callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE) callsInPlace(onCrossReturn, InvocationKind.AT_MOST_ONCE) } + return _safeAutoClose3e(onAbort = { t -> onAbort() }, onSuccess = onSuccess, onCrossReturn = onCrossReturn, action = action) +} + +@PublishedApi +internal inline fun _safeAutoClose3e( + onAbort: (Throwable) -> Unit = {}, + onSuccess: () -> Unit = {}, + onCrossReturn: () -> Unit = {}, + action: () -> R +): R { + contract { + callsInPlace(action, InvocationKind.EXACTLY_ONCE) + callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE) + callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE) + callsInPlace(onCrossReturn, InvocationKind.AT_MOST_ONCE) + } val ret: R var wasError = false @@ -66,7 +106,7 @@ internal inline fun _safeAutoClose3( } catch (e1: Throwable) { wasError = true try { - onAbort() + onAbort(e1) } catch (e2: Throwable) { e1.addSuppressed(e2) } diff --git a/modules/low-level/multithreading/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt b/modules/low-level/multithreading/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt new file mode 100644 index 0000000..6df5b19 --- /dev/null +++ b/modules/low-level/multithreading/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt @@ -0,0 +1,142 @@ +package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading + +import kotlinx.atomicfu.AtomicRef +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.update +import kotlinx.cinterop.CPointer +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.StableRef +import kotlinx.cinterop.nativeHeap +import kotlinx.cinterop.alloc +import kotlinx.cinterop.asStableRef +import kotlinx.cinterop.free +import kotlinx.cinterop.pointed +import kotlinx.cinterop.ptr +import kotlinx.cinterop.staticCFunction +import kotlinx.cinterop.value +import platform.posix.pthread_create +import platform.posix.pthread_join +import platform.posix.pthread_tVar +import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities + +@OptIn(ExperimentalForeignApi::class) +actual class Thread : AutoCloseable { + actual fun interface Routine { + actual fun run() + } + + internal enum class State { + PENDING, STARTING, RUNNING, FINISHED, CLOSED + } + + + internal inner class ThreadBootstrapContext(val routine: Routine) { + val startSignal = CountDownLatch(1uL) + val threadState by this@Thread::_state + var exitedWithError: Throwable? = null + } + + private val _threadHandler: CPointer + private val _bootstrapArgRef: StableRef + private val _state: AtomicRef + + actual constructor(routine: Routine) { + this._state = atomic(State.PENDING) + this._bootstrapArgRef = StableRef.create(ThreadBootstrapContext(routine)) + _safeAutoClose2(onAbort = this._bootstrapArgRef::dispose) { + this._threadHandler = nativeHeap.alloc().ptr + _safeAutoClose2(onAbort = { nativeHeap.free(this._threadHandler) }) { + var err = pthread_create( + this._threadHandler, + null, + staticCFunction(Thread::_threadBootstrap), + this._bootstrapArgRef.asCPointer() + ) + if (err == 0) + PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads thread") } + } + } + + } + + + actual fun start() { + when (this._state.compareAndExchange(State.PENDING, State.STARTING)) { + State.CLOSED -> throw IllegalStateException("Pthreads thread is destroyed") + State.FINISHED -> throw IllegalStateException("Pthreads thread was started and already finished") + State.STARTING, State.RUNNING -> throw IllegalStateException("Pthreads thread already running") + State.PENDING -> {} + } + _safeAutoClose2(onAbort = { this._state.value = State.PENDING }) { + this._bootstrapArgRef.get().startSignal.decrement() + } + } + + actual fun join(): Throwable? { + when (this._state.value) { + State.CLOSED -> throw IllegalStateException("Pthreads thread is destroyed") + State.FINISHED -> return this._bootstrapArgRef.get().exitedWithError + State.STARTING, State.RUNNING, State.PENDING -> {} + } + var err = pthread_join(this._threadHandler.pointed.value, null) + if (err == 0) + PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads thread") } + return this._bootstrapArgRef.get().exitedWithError + } + + override fun close() { + this._state.update { o -> + when (this._state.value) { + State.CLOSED -> throw IllegalStateException("Pthreads thread is destroyed") + State.STARTING, State.RUNNING -> throw IllegalStateException("Can't destroy pthreads thread while it running") + State.FINISHED, State.PENDING -> {} + } + return@update State.CLOSED + } + + _safeAutoClose1( + action = { + var err = pthread_join(this._threadHandler.pointed.value, null) + if (err == 0) + PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads thread") } + }, + finally = { + _safeAutoClose1( + action = { nativeHeap.free(this._threadHandler) }, + finally = { + _safeAutoClose1( + action = { nativeHeap.free(this._threadHandler) }, + finally = { this._bootstrapArgRef.dispose() } + ) + } + ) + } + ) + } + + companion object { + private fun _threadBootstrap(arg: CPointer<*>?): CPointer<*>? { + if (arg == null) return null + val context: ThreadBootstrapContext + try { + context = arg.asStableRef().get() + } catch (_: Throwable) { + return null + } + try { + _safeAutoClose2e( + onAbort = { e -> + context.exitedWithError = e + context.threadState.value = State.FINISHED + }, + onSuccess = { context.threadState.value = State.FINISHED }, + action = { + context.routine.run() + } + ) + } catch (_: Throwable) { + } + return null + } + } +} \ No newline at end of file diff --git a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt index f9af149..1f925b7 100644 --- a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt +++ b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt @@ -1,8 +1,5 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading -import kotlinx.atomicfu.AtomicLong -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.update import kotlinx.cinterop.CPointer import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.alloc @@ -14,19 +11,23 @@ import platform.posix.pthread_cond_init import platform.posix.pthread_cond_destroy import platform.posix.pthread_cond_wait import platform.posix.pthread_cond_broadcast +import platform.posix.pthread_cond_signal import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities @OptIn(ExperimentalForeignApi::class) actual class Condition : AutoCloseable { - private val _refcnt = _Refcnt("Pthreads condition was destroyed") + private val _refcnt: _Refcnt val _descriptor: CPointer actual constructor() { + this._refcnt = _Refcnt("Pthreads condition was destroyed") this._descriptor = nativeHeap.alloc().ptr - var err = pthread_cond_init(this._descriptor, null) - if (err != 0) - PosixUtilities.throwErrno { d -> RuntimeException("Failed to initialize pthreads condition: $d") } + _safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) { + var err = pthread_cond_init(this._descriptor, null) + if (err != 0) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to initialize pthreads condition: $d") } + } } @OptIn(Mutex.RefcntAccess::class) @@ -45,16 +46,20 @@ actual class Condition : AutoCloseable { } actual fun signalOne() = this._refcnt.withRef { - var err = pthread_cond_broadcast(this._descriptor) + var err = pthread_cond_signal(this._descriptor) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to signal pthreads condition: $d") } } override fun close() { this._refcnt.close("There are waiting threads on this pthreads condition") - var err = pthread_cond_destroy(this._descriptor) - if (err != 0) - PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads condition: $d") } - nativeHeap.free(this._descriptor) + _safeAutoClose1( + action = { + var err = pthread_cond_destroy(this._descriptor) + if (err != 0) + PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads condition: $d") } + }, + finally = { nativeHeap.free(this._descriptor) } + ) } } \ No newline at end of file diff --git a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt index c68006b..43ce126 100644 --- a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt +++ b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt @@ -19,14 +19,17 @@ actual class Mutex : AutoCloseable { internal annotation class RefcntAccess @RefcntAccess - internal val _refcnt = _Refcnt("Pthreads mutex was destroyed") + internal val _refcnt: _Refcnt internal val _descriptor: CPointer actual constructor() { + this._refcnt = _Refcnt("Pthreads mutex was destroyed") this._descriptor = nativeHeap.alloc().ptr - var err = pthread_mutex_init(this._descriptor, null) - if (err != 0) - PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads mutex: $d") } + _safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) { + var err = pthread_mutex_init(this._descriptor, null) + if (err != 0) + PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads mutex: $d") } + } } private fun _throwClosed(): Nothing = @@ -39,19 +42,21 @@ actual class Mutex : AutoCloseable { PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to lock pthreads mutex: $d") } } - actual fun unlock() { - this._refcnt.checkNotClosed() + actual fun unlock() = this._refcnt.tryDecref { var err = pthread_mutex_unlock(this._descriptor) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to unlock pthreads mutex: $d") } - this._refcnt.decref() } override fun close() { this._refcnt.close("There are waiting threads on this pthreads mutex") - var err = pthread_mutex_destroy(this._descriptor) - if (err != 0) - PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads mutex: $d") } - nativeHeap.free(this._descriptor) + _safeAutoClose1( + action = { + var err = pthread_mutex_destroy(this._descriptor) + if (err != 0) + PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads mutex: $d") } + }, + finally = { nativeHeap.free(this._descriptor) } + ) } } \ No newline at end of file