diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TCondition.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt similarity index 63% rename from modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TCondition.kt rename to modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt index 2cabdba..dda916c 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TCondition.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt @@ -1,9 +1,9 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading -expect class TCondition : AutoCloseable { +expect class Condition : AutoCloseable { constructor() - fun await(mutex: TMutex) + fun await(mutex: Mutex) fun signalOne() fun signalAll() } \ No newline at end of file diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt new file mode 100644 index 0000000..f0c7041 --- /dev/null +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt @@ -0,0 +1,30 @@ +package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading + +class CountDownLatch(initialCounterValue: ULong) : AutoCloseable { + private val _refcnt = _Refcnt("Latch was destroyed") + private var _counter = initialCounterValue + private val _mutex = Mutex() + private val _condition = Condition() + + fun await() = this._refcnt.withRef { + this._mutex.withLock { + if (this._counter == 0uL) + return + this._condition.await(this._mutex) + } + } + + fun decrement() = this._refcnt.withRef { + this._mutex.withLock { + if (this._counter == 0uL) return + if (--this._counter == 0uL) + this._condition.signalAll() + } + } + + override fun close() { + this._refcnt.close("Latch is still in use") + this._condition.close() + this._mutex.close() + } +} \ No newline at end of file diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TMutex.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt similarity index 74% rename from modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TMutex.kt rename to modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt index d1a06b1..0c806d0 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TMutex.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt @@ -1,6 +1,6 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading -expect class TMutex : AutoCloseable { +expect class Mutex : AutoCloseable { constructor() fun lock() diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt new file mode 100644 index 0000000..234859e --- /dev/null +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt @@ -0,0 +1,46 @@ +package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading + +import kotlinx.atomicfu.AtomicLong +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.update + +internal class _Refcnt(private val _errMessage: String) { + private val _value: AtomicLong = atomic(0L) + + fun throwClosed() { + throw IllegalStateException(this._errMessage) + } + + fun incref() { + this._value.update { o -> + if (o < 0) this.throwClosed() + return@update o + 1 + } + } + + inline fun tryIncref(block: () -> R): R { + this.incref() + return _safeAutoClose2(onAbort = this::decref, action = block) + } + + fun checkNotClosed() { + if (this._value.value < 0) this.throwClosed() + } + + fun decref() { + this._value.update(Long::dec) + } + + fun close(errExistRefs: String) { + val state = this._value.compareAndExchange(0, -1) + when { + state > 0 -> throw IllegalStateException(errExistRefs) + state < 0 -> this.throwClosed() + } + } + + inline fun withRef(protected: () -> R): R { + this.incref() + return _safeAutoClose1(finally = this::decref, action = protected) + } +} \ No newline at end of file diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt index 0ade517..3045f16 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt @@ -5,25 +5,9 @@ import kotlin.contracts.InvocationKind import kotlin.contracts.contract @OptIn(ExperimentalContracts::class) -inline fun TMutex.withLock(synchronizedBlock: () -> R): R { +inline fun Mutex.withLock(synchronizedBlock: () -> R): R { contract { callsInPlace(synchronizedBlock, InvocationKind.EXACTLY_ONCE) } - this.lock() - var err1: Throwable? = null - try { - return synchronizedBlock() - } catch (t: Throwable) { - err1 = t - throw t - } finally { - try { - this.unlock() - } catch (err2: Throwable) { - if (err1 != null) - err1.addSuppressed(err2) - else - throw err2 - } - } + return _safeAutoClose1(finally = this::unlock, synchronizedBlock) } \ No newline at end of file diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt index cb325c2..d56d00f 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt @@ -1,5 +1,10 @@ +@file:OptIn(ExperimentalContracts::class) + package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract import kotlinx.atomicfu.AtomicLong internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long { @@ -10,23 +15,48 @@ internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long } } -internal inline fun _safeAutoClose( +@PublishedApi +internal inline fun _safeAutoClose1( finally: () -> Unit, action: () -> R -): R = _safeAutoClose(onAbort = finally, onSuccess = finally, onCrossReturn = finally, action = action) +): R { + @Suppress("WRONG_INVOCATION_KIND") + contract { + callsInPlace(action, InvocationKind.EXACTLY_ONCE) + callsInPlace(finally, InvocationKind.EXACTLY_ONCE) + } + return _safeAutoClose3(onAbort = finally, onSuccess = finally, onCrossReturn = finally, action = action) +} -internal inline fun _safeAutoClose( +@PublishedApi +internal inline fun _safeAutoClose2( onAbort: () -> Unit = {}, onSuccess: () -> Unit = {}, action: () -> R -): R = _safeAutoClose(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) +): R { + @Suppress("WRONG_INVOCATION_KIND") + contract { + callsInPlace(action, InvocationKind.EXACTLY_ONCE) + callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE) + callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE) + } + return _safeAutoClose3(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) +} -internal inline fun _safeAutoClose( +@PublishedApi +internal inline fun _safeAutoClose3( onAbort: () -> Unit = {}, onSuccess: () -> Unit = {}, onCrossReturn: () -> Unit = {}, action: () -> R ): R { + contract { + callsInPlace(action, InvocationKind.EXACTLY_ONCE) + callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE) + callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE) + callsInPlace(onCrossReturn, InvocationKind.AT_MOST_ONCE) + } + val ret: R var wasError = false var crossReturned = true diff --git a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TCondition.kt b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt similarity index 62% rename from modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TCondition.kt rename to modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt index 5b2ba1b..f9af149 100644 --- a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TCondition.kt +++ b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt @@ -15,13 +15,12 @@ import platform.posix.pthread_cond_destroy import platform.posix.pthread_cond_wait import platform.posix.pthread_cond_broadcast import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities -import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.TMutex.RefcntAccess @OptIn(ExperimentalForeignApi::class) -actual class TCondition : AutoCloseable { +actual class Condition : AutoCloseable { + private val _refcnt = _Refcnt("Pthreads condition was destroyed") val _descriptor: CPointer - private var _refcnt: AtomicLong = atomic(0L) actual constructor() { this._descriptor = nativeHeap.alloc().ptr @@ -30,52 +29,29 @@ actual class TCondition : AutoCloseable { PosixUtilities.throwErrno { d -> RuntimeException("Failed to initialize pthreads condition: $d") } } - - private fun _throwClosed(): Nothing = - throw IllegalStateException("Pthreads condition was destroyed") - - private inline fun _withRefcnt(protectedBlock: () -> R): R { - this._refcnt.update { o -> - if (o < 0) this._throwClosed() - return@update o + 1 - } - _safeAutoClose(finally = { this._refcnt.update(Long::dec) }) { - return protectedBlock() - } - } - - internal fun _decref() { - this._refcnt.update(Long::dec) - } - - @OptIn(TMutex.RefcntAccess::class) - actual fun await(mutex: TMutex) = this._withRefcnt { - mutex._incref() - _safeAutoClose(finally = { mutex._decref() }) { + @OptIn(Mutex.RefcntAccess::class) + actual fun await(mutex: Mutex) = this._refcnt.withRef { + mutex._refcnt.withRef { var err = pthread_cond_wait(this._descriptor, mutex._descriptor) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to wait on pthreads condition: $d") } } } - actual fun signalAll() = this._withRefcnt { + actual fun signalAll() = this._refcnt.withRef { var err = pthread_cond_broadcast(this._descriptor) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to signal pthreads condition: $d") } } - actual fun signalOne() = this._withRefcnt { + actual fun signalOne() = this._refcnt.withRef { var err = pthread_cond_broadcast(this._descriptor) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to signal pthreads condition: $d") } } override fun close() { - val state = this._refcnt.compareAndExchange(0, -1) - when { - state > 0 -> throw IllegalStateException("There are waiting threads on this pthreads condition") - state < 0 -> this._throwClosed() - } + this._refcnt.close("There are waiting threads on this pthreads condition") var err = pthread_cond_destroy(this._descriptor) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads condition: $d") } diff --git a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TMutex.kt b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt similarity index 63% rename from modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TMutex.kt rename to modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt index b07bccd..c68006b 100644 --- a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/TMutex.kt +++ b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt @@ -1,8 +1,5 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading -import kotlinx.atomicfu.AtomicLong -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.update import kotlinx.cinterop.CPointer import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.alloc @@ -16,9 +13,13 @@ import platform.posix.pthread_mutex_t import platform.posix.pthread_mutex_unlock import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities -@OptIn(ExperimentalForeignApi::class) -actual class TMutex : AutoCloseable { - private var _refcnt: AtomicLong = atomic(0L) +@OptIn(ExperimentalForeignApi::class, Mutex.RefcntAccess::class) +actual class Mutex : AutoCloseable { + @RequiresOptIn + internal annotation class RefcntAccess + + @RefcntAccess + internal val _refcnt = _Refcnt("Pthreads mutex was destroyed") internal val _descriptor: CPointer actual constructor() { @@ -29,48 +30,25 @@ actual class TMutex : AutoCloseable { } private fun _throwClosed(): Nothing = - throw IllegalStateException("Pthreads mutex was destroyed") + throw IllegalStateException() - @RequiresOptIn - annotation class RefcntAccess - @RefcntAccess - internal fun _incref() { - this._refcnt.update { o -> - if (o < 0) this._throwClosed() - return@update o + 1 - } - } - - @RefcntAccess - internal fun _decref() { - this._refcnt.update(Long::dec) - } - - actual fun lock() { - this._refcnt.update { o -> - if (o < 0) this._throwClosed() - return@update o + 1 - } + actual fun lock() = this._refcnt.tryIncref { var err = pthread_mutex_lock(this._descriptor) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to lock pthreads mutex: $d") } } actual fun unlock() { - if (this._refcnt.value < 0) this._throwClosed() + this._refcnt.checkNotClosed() var err = pthread_mutex_unlock(this._descriptor) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to unlock pthreads mutex: $d") } - this._refcnt.update(Long::dec) + this._refcnt.decref() } override fun close() { - val state = this._refcnt.compareAndExchange(0, -1) - when { - state > 0 -> throw IllegalStateException("There are waiting threads on this pthreads mutex") - state < 0 -> this._throwClosed() - } + this._refcnt.close("There are waiting threads on this pthreads mutex") var err = pthread_mutex_destroy(this._descriptor) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads mutex: $d") }