CountDownLatch and decomposed refcnt mechanic to separate class

This commit is contained in:
Andrew Golovashevich 2025-03-18 20:28:43 +03:00
parent 4b3ef448af
commit b3d1c54cef
8 changed files with 136 additions and 92 deletions

View File

@ -1,9 +1,9 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
expect class TCondition : AutoCloseable { expect class Condition : AutoCloseable {
constructor() constructor()
fun await(mutex: TMutex) fun await(mutex: Mutex)
fun signalOne() fun signalOne()
fun signalAll() fun signalAll()
} }

View File

@ -0,0 +1,30 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
class CountDownLatch(initialCounterValue: ULong) : AutoCloseable {
private val _refcnt = _Refcnt("Latch was destroyed")
private var _counter = initialCounterValue
private val _mutex = Mutex()
private val _condition = Condition()
fun await() = this._refcnt.withRef {
this._mutex.withLock {
if (this._counter == 0uL)
return
this._condition.await(this._mutex)
}
}
fun decrement() = this._refcnt.withRef {
this._mutex.withLock {
if (this._counter == 0uL) return
if (--this._counter == 0uL)
this._condition.signalAll()
}
}
override fun close() {
this._refcnt.close("Latch is still in use")
this._condition.close()
this._mutex.close()
}
}

View File

@ -1,6 +1,6 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
expect class TMutex : AutoCloseable { expect class Mutex : AutoCloseable {
constructor() constructor()
fun lock() fun lock()

View File

@ -0,0 +1,46 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
internal class _Refcnt(private val _errMessage: String) {
private val _value: AtomicLong = atomic(0L)
fun throwClosed() {
throw IllegalStateException(this._errMessage)
}
fun incref() {
this._value.update { o ->
if (o < 0) this.throwClosed()
return@update o + 1
}
}
inline fun <R> tryIncref(block: () -> R): R {
this.incref()
return _safeAutoClose2(onAbort = this::decref, action = block)
}
fun checkNotClosed() {
if (this._value.value < 0) this.throwClosed()
}
fun decref() {
this._value.update(Long::dec)
}
fun close(errExistRefs: String) {
val state = this._value.compareAndExchange(0, -1)
when {
state > 0 -> throw IllegalStateException(errExistRefs)
state < 0 -> this.throwClosed()
}
}
inline fun <R> withRef(protected: () -> R): R {
this.incref()
return _safeAutoClose1(finally = this::decref, action = protected)
}
}

View File

@ -5,25 +5,9 @@ import kotlin.contracts.InvocationKind
import kotlin.contracts.contract import kotlin.contracts.contract
@OptIn(ExperimentalContracts::class) @OptIn(ExperimentalContracts::class)
inline fun <R> TMutex.withLock(synchronizedBlock: () -> R): R { inline fun <R> Mutex.withLock(synchronizedBlock: () -> R): R {
contract { contract {
callsInPlace(synchronizedBlock, InvocationKind.EXACTLY_ONCE) callsInPlace(synchronizedBlock, InvocationKind.EXACTLY_ONCE)
} }
this.lock() return _safeAutoClose1(finally = this::unlock, synchronizedBlock)
var err1: Throwable? = null
try {
return synchronizedBlock()
} catch (t: Throwable) {
err1 = t
throw t
} finally {
try {
this.unlock()
} catch (err2: Throwable) {
if (err1 != null)
err1.addSuppressed(err2)
else
throw err2
}
}
} }

View File

@ -1,5 +1,10 @@
@file:OptIn(ExperimentalContracts::class)
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlinx.atomicfu.AtomicLong import kotlinx.atomicfu.AtomicLong
internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long { internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long {
@ -10,23 +15,48 @@ internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long
} }
} }
internal inline fun <R> _safeAutoClose( @PublishedApi
internal inline fun <R> _safeAutoClose1(
finally: () -> Unit, finally: () -> Unit,
action: () -> R action: () -> R
): R = _safeAutoClose(onAbort = finally, onSuccess = finally, onCrossReturn = finally, action = action) ): R {
@Suppress("WRONG_INVOCATION_KIND")
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
callsInPlace(finally, InvocationKind.EXACTLY_ONCE)
}
return _safeAutoClose3(onAbort = finally, onSuccess = finally, onCrossReturn = finally, action = action)
}
internal inline fun <R> _safeAutoClose( @PublishedApi
internal inline fun <R> _safeAutoClose2(
onAbort: () -> Unit = {}, onAbort: () -> Unit = {},
onSuccess: () -> Unit = {}, onSuccess: () -> Unit = {},
action: () -> R action: () -> R
): R = _safeAutoClose(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) ): R {
@Suppress("WRONG_INVOCATION_KIND")
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE)
callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE)
}
return _safeAutoClose3(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action)
}
internal inline fun <R> _safeAutoClose( @PublishedApi
internal inline fun <R> _safeAutoClose3(
onAbort: () -> Unit = {}, onAbort: () -> Unit = {},
onSuccess: () -> Unit = {}, onSuccess: () -> Unit = {},
onCrossReturn: () -> Unit = {}, onCrossReturn: () -> Unit = {},
action: () -> R action: () -> R
): R { ): R {
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE)
callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE)
callsInPlace(onCrossReturn, InvocationKind.AT_MOST_ONCE)
}
val ret: R val ret: R
var wasError = false var wasError = false
var crossReturned = true var crossReturned = true

View File

@ -15,13 +15,12 @@ import platform.posix.pthread_cond_destroy
import platform.posix.pthread_cond_wait import platform.posix.pthread_cond_wait
import platform.posix.pthread_cond_broadcast import platform.posix.pthread_cond_broadcast
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.TMutex.RefcntAccess
@OptIn(ExperimentalForeignApi::class) @OptIn(ExperimentalForeignApi::class)
actual class TCondition : AutoCloseable { actual class Condition : AutoCloseable {
private val _refcnt = _Refcnt("Pthreads condition was destroyed")
val _descriptor: CPointer<pthread_cond_t> val _descriptor: CPointer<pthread_cond_t>
private var _refcnt: AtomicLong = atomic(0L)
actual constructor() { actual constructor() {
this._descriptor = nativeHeap.alloc<pthread_cond_t>().ptr this._descriptor = nativeHeap.alloc<pthread_cond_t>().ptr
@ -30,52 +29,29 @@ actual class TCondition : AutoCloseable {
PosixUtilities.throwErrno { d -> RuntimeException("Failed to initialize pthreads condition: $d") } PosixUtilities.throwErrno { d -> RuntimeException("Failed to initialize pthreads condition: $d") }
} }
@OptIn(Mutex.RefcntAccess::class)
private fun _throwClosed(): Nothing = actual fun await(mutex: Mutex) = this._refcnt.withRef {
throw IllegalStateException("Pthreads condition was destroyed") mutex._refcnt.withRef {
private inline fun <R> _withRefcnt(protectedBlock: () -> R): R {
this._refcnt.update { o ->
if (o < 0) this._throwClosed()
return@update o + 1
}
_safeAutoClose(finally = { this._refcnt.update(Long::dec) }) {
return protectedBlock()
}
}
internal fun _decref() {
this._refcnt.update(Long::dec)
}
@OptIn(TMutex.RefcntAccess::class)
actual fun await(mutex: TMutex) = this._withRefcnt {
mutex._incref()
_safeAutoClose(finally = { mutex._decref() }) {
var err = pthread_cond_wait(this._descriptor, mutex._descriptor) var err = pthread_cond_wait(this._descriptor, mutex._descriptor)
if (err != 0) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to wait on pthreads condition: $d") } PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to wait on pthreads condition: $d") }
} }
} }
actual fun signalAll() = this._withRefcnt { actual fun signalAll() = this._refcnt.withRef {
var err = pthread_cond_broadcast(this._descriptor) var err = pthread_cond_broadcast(this._descriptor)
if (err != 0) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to signal pthreads condition: $d") } PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to signal pthreads condition: $d") }
} }
actual fun signalOne() = this._withRefcnt { actual fun signalOne() = this._refcnt.withRef {
var err = pthread_cond_broadcast(this._descriptor) var err = pthread_cond_broadcast(this._descriptor)
if (err != 0) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to signal pthreads condition: $d") } PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to signal pthreads condition: $d") }
} }
override fun close() { override fun close() {
val state = this._refcnt.compareAndExchange(0, -1) this._refcnt.close("There are waiting threads on this pthreads condition")
when {
state > 0 -> throw IllegalStateException("There are waiting threads on this pthreads condition")
state < 0 -> this._throwClosed()
}
var err = pthread_cond_destroy(this._descriptor) var err = pthread_cond_destroy(this._descriptor)
if (err != 0) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads condition: $d") } PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads condition: $d") }

View File

@ -1,8 +1,5 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.cinterop.CPointer import kotlinx.cinterop.CPointer
import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.alloc import kotlinx.cinterop.alloc
@ -16,9 +13,13 @@ import platform.posix.pthread_mutex_t
import platform.posix.pthread_mutex_unlock import platform.posix.pthread_mutex_unlock
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
@OptIn(ExperimentalForeignApi::class) @OptIn(ExperimentalForeignApi::class, Mutex.RefcntAccess::class)
actual class TMutex : AutoCloseable { actual class Mutex : AutoCloseable {
private var _refcnt: AtomicLong = atomic(0L) @RequiresOptIn
internal annotation class RefcntAccess
@RefcntAccess
internal val _refcnt = _Refcnt("Pthreads mutex was destroyed")
internal val _descriptor: CPointer<pthread_mutex_t> internal val _descriptor: CPointer<pthread_mutex_t>
actual constructor() { actual constructor() {
@ -29,48 +30,25 @@ actual class TMutex : AutoCloseable {
} }
private fun _throwClosed(): Nothing = private fun _throwClosed(): Nothing =
throw IllegalStateException("Pthreads mutex was destroyed") throw IllegalStateException()
@RequiresOptIn
annotation class RefcntAccess
@RefcntAccess actual fun lock() = this._refcnt.tryIncref {
internal fun _incref() {
this._refcnt.update { o ->
if (o < 0) this._throwClosed()
return@update o + 1
}
}
@RefcntAccess
internal fun _decref() {
this._refcnt.update(Long::dec)
}
actual fun lock() {
this._refcnt.update { o ->
if (o < 0) this._throwClosed()
return@update o + 1
}
var err = pthread_mutex_lock(this._descriptor) var err = pthread_mutex_lock(this._descriptor)
if (err != 0) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to lock pthreads mutex: $d") } PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to lock pthreads mutex: $d") }
} }
actual fun unlock() { actual fun unlock() {
if (this._refcnt.value < 0) this._throwClosed() this._refcnt.checkNotClosed()
var err = pthread_mutex_unlock(this._descriptor) var err = pthread_mutex_unlock(this._descriptor)
if (err != 0) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to unlock pthreads mutex: $d") } PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to unlock pthreads mutex: $d") }
this._refcnt.update(Long::dec) this._refcnt.decref()
} }
override fun close() { override fun close() {
val state = this._refcnt.compareAndExchange(0, -1) this._refcnt.close("There are waiting threads on this pthreads mutex")
when {
state > 0 -> throw IllegalStateException("There are waiting threads on this pthreads mutex")
state < 0 -> this._throwClosed()
}
var err = pthread_mutex_destroy(this._descriptor) var err = pthread_mutex_destroy(this._descriptor)
if (err != 0) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads mutex: $d") } PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads mutex: $d") }