Thread implementation for linux

This commit is contained in:
Andrew Golovashevich 2025-03-19 00:37:38 +03:00
parent b3d1c54cef
commit c1f96f4915
7 changed files with 251 additions and 31 deletions

View File

@ -1,10 +1,19 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
class CountDownLatch(initialCounterValue: ULong) : AutoCloseable { class CountDownLatch : AutoCloseable {
private val _refcnt = _Refcnt("Latch was destroyed") private val _refcnt: _Refcnt
private var _counter = initialCounterValue private var _counter: ULong
private val _mutex = Mutex() private val _mutex: Mutex
private val _condition = Condition() private val _condition: Condition
constructor(initialCounterValue: ULong) {
this._refcnt = _Refcnt("Latch was destroyed")
this._counter = initialCounterValue
this._mutex = Mutex()
_safeAutoClose2(onAbort = this._mutex::close) {
this._condition = Condition()
}
}
fun await() = this._refcnt.withRef { fun await() = this._refcnt.withRef {
this._mutex.withLock { this._mutex.withLock {
@ -24,7 +33,9 @@ class CountDownLatch(initialCounterValue: ULong) : AutoCloseable {
override fun close() { override fun close() {
this._refcnt.close("Latch is still in use") this._refcnt.close("Latch is still in use")
this._condition.close() _safeAutoClose1(
this._mutex.close() action = this._condition::close,
finally = this._mutex::close
)
} }
} }

View File

@ -0,0 +1,13 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
expect class Thread : AutoCloseable {
fun interface Routine {
fun run()
}
constructor(routine: Routine)
fun start()
fun join(): Throwable?
}

View File

@ -31,6 +31,10 @@ internal class _Refcnt(private val _errMessage: String) {
this._value.update(Long::dec) this._value.update(Long::dec)
} }
inline fun <R> tryDecref(block: () -> R): R {
this.checkNotClosed()
return _safeAutoClose2(onSuccess = this::decref, action = block)
}
fun close(errExistRefs: String) { fun close(errExistRefs: String) {
val state = this._value.compareAndExchange(0, -1) val state = this._value.compareAndExchange(0, -1)
when { when {

View File

@ -6,6 +6,7 @@ import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind import kotlin.contracts.InvocationKind
import kotlin.contracts.contract import kotlin.contracts.contract
import kotlinx.atomicfu.AtomicLong import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.AtomicRef
internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long { internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long {
while (true) { while (true) {
@ -15,6 +16,14 @@ internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long
} }
} }
internal fun <T> AtomicRef<T>.compareAndExchange(expected: T, newValue: T): T {
while (true) {
val old = this.value
if (old === expected) return old
if (this.compareAndSet(old, newValue)) return old
}
}
@PublishedApi @PublishedApi
internal inline fun <R> _safeAutoClose1( internal inline fun <R> _safeAutoClose1(
finally: () -> Unit, finally: () -> Unit,
@ -43,6 +52,21 @@ internal inline fun <R> _safeAutoClose2(
return _safeAutoClose3(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) return _safeAutoClose3(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action)
} }
@PublishedApi
internal inline fun <R> _safeAutoClose2e(
onAbort: (Throwable) -> Unit = {},
onSuccess: () -> Unit = {},
action: () -> R
): R {
@Suppress("WRONG_INVOCATION_KIND")
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE)
callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE)
}
return _safeAutoClose3e(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action)
}
@PublishedApi @PublishedApi
internal inline fun <R> _safeAutoClose3( internal inline fun <R> _safeAutoClose3(
onAbort: () -> Unit = {}, onAbort: () -> Unit = {},
@ -56,6 +80,22 @@ internal inline fun <R> _safeAutoClose3(
callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE) callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE)
callsInPlace(onCrossReturn, InvocationKind.AT_MOST_ONCE) callsInPlace(onCrossReturn, InvocationKind.AT_MOST_ONCE)
} }
return _safeAutoClose3e(onAbort = { t -> onAbort() }, onSuccess = onSuccess, onCrossReturn = onCrossReturn, action = action)
}
@PublishedApi
internal inline fun <R> _safeAutoClose3e(
onAbort: (Throwable) -> Unit = {},
onSuccess: () -> Unit = {},
onCrossReturn: () -> Unit = {},
action: () -> R
): R {
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE)
callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE)
callsInPlace(onCrossReturn, InvocationKind.AT_MOST_ONCE)
}
val ret: R val ret: R
var wasError = false var wasError = false
@ -66,7 +106,7 @@ internal inline fun <R> _safeAutoClose3(
} catch (e1: Throwable) { } catch (e1: Throwable) {
wasError = true wasError = true
try { try {
onAbort() onAbort(e1)
} catch (e2: Throwable) { } catch (e2: Throwable) {
e1.addSuppressed(e2) e1.addSuppressed(e2)
} }

View File

@ -0,0 +1,142 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.cinterop.CPointer
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.StableRef
import kotlinx.cinterop.nativeHeap
import kotlinx.cinterop.alloc
import kotlinx.cinterop.asStableRef
import kotlinx.cinterop.free
import kotlinx.cinterop.pointed
import kotlinx.cinterop.ptr
import kotlinx.cinterop.staticCFunction
import kotlinx.cinterop.value
import platform.posix.pthread_create
import platform.posix.pthread_join
import platform.posix.pthread_tVar
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
@OptIn(ExperimentalForeignApi::class)
actual class Thread : AutoCloseable {
actual fun interface Routine {
actual fun run()
}
internal enum class State {
PENDING, STARTING, RUNNING, FINISHED, CLOSED
}
internal inner class ThreadBootstrapContext(val routine: Routine) {
val startSignal = CountDownLatch(1uL)
val threadState by this@Thread::_state
var exitedWithError: Throwable? = null
}
private val _threadHandler: CPointer<pthread_tVar>
private val _bootstrapArgRef: StableRef<ThreadBootstrapContext>
private val _state: AtomicRef<State>
actual constructor(routine: Routine) {
this._state = atomic(State.PENDING)
this._bootstrapArgRef = StableRef.create(ThreadBootstrapContext(routine))
_safeAutoClose2(onAbort = this._bootstrapArgRef::dispose) {
this._threadHandler = nativeHeap.alloc<pthread_tVar>().ptr
_safeAutoClose2(onAbort = { nativeHeap.free(this._threadHandler) }) {
var err = pthread_create(
this._threadHandler,
null,
staticCFunction(Thread::_threadBootstrap),
this._bootstrapArgRef.asCPointer()
)
if (err == 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads thread") }
}
}
}
actual fun start() {
when (this._state.compareAndExchange(State.PENDING, State.STARTING)) {
State.CLOSED -> throw IllegalStateException("Pthreads thread is destroyed")
State.FINISHED -> throw IllegalStateException("Pthreads thread was started and already finished")
State.STARTING, State.RUNNING -> throw IllegalStateException("Pthreads thread already running")
State.PENDING -> {}
}
_safeAutoClose2(onAbort = { this._state.value = State.PENDING }) {
this._bootstrapArgRef.get().startSignal.decrement()
}
}
actual fun join(): Throwable? {
when (this._state.value) {
State.CLOSED -> throw IllegalStateException("Pthreads thread is destroyed")
State.FINISHED -> return this._bootstrapArgRef.get().exitedWithError
State.STARTING, State.RUNNING, State.PENDING -> {}
}
var err = pthread_join(this._threadHandler.pointed.value, null)
if (err == 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads thread") }
return this._bootstrapArgRef.get().exitedWithError
}
override fun close() {
this._state.update { o ->
when (this._state.value) {
State.CLOSED -> throw IllegalStateException("Pthreads thread is destroyed")
State.STARTING, State.RUNNING -> throw IllegalStateException("Can't destroy pthreads thread while it running")
State.FINISHED, State.PENDING -> {}
}
return@update State.CLOSED
}
_safeAutoClose1(
action = {
var err = pthread_join(this._threadHandler.pointed.value, null)
if (err == 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads thread") }
},
finally = {
_safeAutoClose1(
action = { nativeHeap.free(this._threadHandler) },
finally = {
_safeAutoClose1(
action = { nativeHeap.free(this._threadHandler) },
finally = { this._bootstrapArgRef.dispose() }
)
}
)
}
)
}
companion object {
private fun _threadBootstrap(arg: CPointer<*>?): CPointer<*>? {
if (arg == null) return null
val context: ThreadBootstrapContext
try {
context = arg.asStableRef<ThreadBootstrapContext>().get()
} catch (_: Throwable) {
return null
}
try {
_safeAutoClose2e(
onAbort = { e ->
context.exitedWithError = e
context.threadState.value = State.FINISHED
},
onSuccess = { context.threadState.value = State.FINISHED },
action = {
context.routine.run()
}
)
} catch (_: Throwable) {
}
return null
}
}
}

View File

@ -1,8 +1,5 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.cinterop.CPointer import kotlinx.cinterop.CPointer
import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.alloc import kotlinx.cinterop.alloc
@ -14,19 +11,23 @@ import platform.posix.pthread_cond_init
import platform.posix.pthread_cond_destroy import platform.posix.pthread_cond_destroy
import platform.posix.pthread_cond_wait import platform.posix.pthread_cond_wait
import platform.posix.pthread_cond_broadcast import platform.posix.pthread_cond_broadcast
import platform.posix.pthread_cond_signal
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
@OptIn(ExperimentalForeignApi::class) @OptIn(ExperimentalForeignApi::class)
actual class Condition : AutoCloseable { actual class Condition : AutoCloseable {
private val _refcnt = _Refcnt("Pthreads condition was destroyed") private val _refcnt: _Refcnt
val _descriptor: CPointer<pthread_cond_t> val _descriptor: CPointer<pthread_cond_t>
actual constructor() { actual constructor() {
this._refcnt = _Refcnt("Pthreads condition was destroyed")
this._descriptor = nativeHeap.alloc<pthread_cond_t>().ptr this._descriptor = nativeHeap.alloc<pthread_cond_t>().ptr
var err = pthread_cond_init(this._descriptor, null) _safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) {
if (err != 0) var err = pthread_cond_init(this._descriptor, null)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to initialize pthreads condition: $d") } if (err != 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to initialize pthreads condition: $d") }
}
} }
@OptIn(Mutex.RefcntAccess::class) @OptIn(Mutex.RefcntAccess::class)
@ -45,16 +46,20 @@ actual class Condition : AutoCloseable {
} }
actual fun signalOne() = this._refcnt.withRef { actual fun signalOne() = this._refcnt.withRef {
var err = pthread_cond_broadcast(this._descriptor) var err = pthread_cond_signal(this._descriptor)
if (err != 0) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to signal pthreads condition: $d") } PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to signal pthreads condition: $d") }
} }
override fun close() { override fun close() {
this._refcnt.close("There are waiting threads on this pthreads condition") this._refcnt.close("There are waiting threads on this pthreads condition")
var err = pthread_cond_destroy(this._descriptor) _safeAutoClose1(
if (err != 0) action = {
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads condition: $d") } var err = pthread_cond_destroy(this._descriptor)
nativeHeap.free(this._descriptor) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads condition: $d") }
},
finally = { nativeHeap.free(this._descriptor) }
)
} }
} }

View File

@ -19,14 +19,17 @@ actual class Mutex : AutoCloseable {
internal annotation class RefcntAccess internal annotation class RefcntAccess
@RefcntAccess @RefcntAccess
internal val _refcnt = _Refcnt("Pthreads mutex was destroyed") internal val _refcnt: _Refcnt
internal val _descriptor: CPointer<pthread_mutex_t> internal val _descriptor: CPointer<pthread_mutex_t>
actual constructor() { actual constructor() {
this._refcnt = _Refcnt("Pthreads mutex was destroyed")
this._descriptor = nativeHeap.alloc<pthread_mutex_t>().ptr this._descriptor = nativeHeap.alloc<pthread_mutex_t>().ptr
var err = pthread_mutex_init(this._descriptor, null) _safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) {
if (err != 0) var err = pthread_mutex_init(this._descriptor, null)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads mutex: $d") } if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads mutex: $d") }
}
} }
private fun _throwClosed(): Nothing = private fun _throwClosed(): Nothing =
@ -39,19 +42,21 @@ actual class Mutex : AutoCloseable {
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to lock pthreads mutex: $d") } PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to lock pthreads mutex: $d") }
} }
actual fun unlock() { actual fun unlock() = this._refcnt.tryDecref {
this._refcnt.checkNotClosed()
var err = pthread_mutex_unlock(this._descriptor) var err = pthread_mutex_unlock(this._descriptor)
if (err != 0) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to unlock pthreads mutex: $d") } PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to unlock pthreads mutex: $d") }
this._refcnt.decref()
} }
override fun close() { override fun close() {
this._refcnt.close("There are waiting threads on this pthreads mutex") this._refcnt.close("There are waiting threads on this pthreads mutex")
var err = pthread_mutex_destroy(this._descriptor) _safeAutoClose1(
if (err != 0) action = {
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads mutex: $d") } var err = pthread_mutex_destroy(this._descriptor)
nativeHeap.free(this._descriptor) if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads mutex: $d") }
},
finally = { nativeHeap.free(this._descriptor) }
)
} }
} }