Extracted RefCounter to a separate library and fixed bug with refcnt in epoll event-loop

This commit is contained in:
Andrew Golovashevich 2025-03-22 21:07:49 +03:00
parent 5cfb5551c4
commit af2f50a3b8
10 changed files with 25 additions and 91 deletions

3
.gitmodules vendored
View File

@ -4,3 +4,6 @@
[submodule "libs/highlevel-try-finally"] [submodule "libs/highlevel-try-finally"]
path = libs/highlevel-try-finally path = libs/highlevel-try-finally
url = https://git.landgrafhomyak.ru/xomrk/highlevel-try-finally.kt url = https://git.landgrafhomyak.ru/xomrk/highlevel-try-finally.kt
[submodule "libs/reference-counter"]
path = libs/reference-counter
url = https://git.landgrafhomyak.ru/xomrk/reference-counter.kt

@ -0,0 +1 @@
Subproject commit 0e2aea0bff433b3917f89e2b61ab5482154cec16

View File

@ -1,18 +1,17 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose1
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2 import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2
import ru.landgrafhomyak.utility.highlevel_try_finally.tryFinallyChain import ru.landgrafhomyak.utility.highlevel_try_finally.tryFinallyChain
import ru.landrafhomyak.utility.reference_counter.CloseableReferenceCounter
class CountDownLatch : AutoCloseable { class CountDownLatch : AutoCloseable {
private val _refcnt: CloseableRefCounter private val _refcnt: CloseableReferenceCounter
private var _counter: ULong private var _counter: ULong
private val _mutex: Mutex private val _mutex: Mutex
private val _condition: Condition private val _condition: Condition
constructor(initialCounterValue: ULong) { constructor(initialCounterValue: ULong) {
this._refcnt = CloseableRefCounter("Latch was destroyed") this._refcnt = CloseableReferenceCounter("Latch was destroyed")
this._counter = initialCounterValue this._counter = initialCounterValue
this._mutex = Mutex() this._mutex = Mutex()
safeAutoClose2(onError = this._mutex::close) { safeAutoClose2(onError = this._mutex::close) {

View File

@ -13,19 +13,19 @@ import platform.posix.pthread_cond_wait
import platform.posix.pthread_cond_broadcast import platform.posix.pthread_cond_broadcast
import platform.posix.pthread_cond_signal import platform.posix.pthread_cond_signal
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose1 import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose1
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2 import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2
import ru.landgrafhomyak.utility.highlevel_try_finally.tryFinallyChain import ru.landgrafhomyak.utility.highlevel_try_finally.tryFinallyChain
import ru.landrafhomyak.utility.reference_counter.CloseableReferenceCounter
@OptIn(ExperimentalForeignApi::class) @OptIn(ExperimentalForeignApi::class)
actual class Condition : AutoCloseable { actual class Condition : AutoCloseable {
private val _refcnt: CloseableRefCounter private val _refcnt: CloseableReferenceCounter
val _descriptor: CPointer<pthread_cond_t> val _descriptor: CPointer<pthread_cond_t>
actual constructor() { actual constructor() {
this._refcnt = CloseableRefCounter("Pthreads condition was destroyed") this._refcnt = CloseableReferenceCounter("Pthreads condition was destroyed")
this._descriptor = nativeHeap.alloc<pthread_cond_t>().ptr this._descriptor = nativeHeap.alloc<pthread_cond_t>().ptr
safeAutoClose2(onError = { nativeHeap.free(this._descriptor) }) { safeAutoClose2(onError = { nativeHeap.free(this._descriptor) }) {
var err = pthread_cond_init(this._descriptor, null) var err = pthread_cond_init(this._descriptor, null)

View File

@ -12,10 +12,9 @@ import platform.posix.pthread_mutex_lock
import platform.posix.pthread_mutex_t import platform.posix.pthread_mutex_t
import platform.posix.pthread_mutex_unlock import platform.posix.pthread_mutex_unlock
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose1
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2 import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2
import ru.landgrafhomyak.utility.highlevel_try_finally.tryFinallyChain import ru.landgrafhomyak.utility.highlevel_try_finally.tryFinallyChain
import ru.landrafhomyak.utility.reference_counter.CloseableReferenceCounter
@OptIn(ExperimentalForeignApi::class, Mutex.RefcntAccess::class) @OptIn(ExperimentalForeignApi::class, Mutex.RefcntAccess::class)
actual class Mutex : AutoCloseable { actual class Mutex : AutoCloseable {
@ -23,11 +22,11 @@ actual class Mutex : AutoCloseable {
internal annotation class RefcntAccess internal annotation class RefcntAccess
@RefcntAccess @RefcntAccess
internal val _refcnt: CloseableRefCounter internal val _refcnt: CloseableReferenceCounter
internal val _descriptor: CPointer<pthread_mutex_t> internal val _descriptor: CPointer<pthread_mutex_t>
actual constructor() { actual constructor() {
this._refcnt = CloseableRefCounter("Pthreads mutex was destroyed") this._refcnt = CloseableReferenceCounter("Pthreads mutex was destroyed")
this._descriptor = nativeHeap.alloc<pthread_mutex_t>().ptr this._descriptor = nativeHeap.alloc<pthread_mutex_t>().ptr
safeAutoClose2(onError = { nativeHeap.free(this._descriptor) }) { safeAutoClose2(onError = { nativeHeap.free(this._descriptor) }) {
var err = pthread_mutex_init(this._descriptor, null) var err = pthread_mutex_init(this._descriptor, null)

View File

@ -29,20 +29,20 @@ import platform.posix.sockaddr_in
import platform.posix.sockaddr_in6 import platform.posix.sockaddr_in6
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose1 import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose1
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2 import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2
import ru.landgrafhomyak.utility.highlevel_try_finally.tryFinallyChain import ru.landgrafhomyak.utility.highlevel_try_finally.tryFinallyChain
import ru.landrafhomyak.utility.reference_counter.CloseableReferenceCounter
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Mutex as TMutex import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Mutex as TMutex
@OptIn(ExperimentalForeignApi::class) @OptIn(ExperimentalForeignApi::class)
@Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName") @Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName")
class EpollSocketEventLoop : SocketEventLoopScope.Closeable { class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
private val _epollDescriptor: Int private val _epollDescriptor: Int
private val _socketsCount: CloseableRefCounter private val _socketsCount: CloseableReferenceCounter
constructor() { constructor() {
this._socketsCount = CloseableRefCounter("Epoll event-loop is closed") this._socketsCount = CloseableReferenceCounter("Epoll event-loop is closed")
this._epollDescriptor = epoll_create(1) this._epollDescriptor = epoll_create(1)
if (this._epollDescriptor < 0) if (this._epollDescriptor < 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to create epoll descriptor: $d") } PosixUtilities.throwErrno { d -> RuntimeException("Failed to create epoll descriptor: $d") }
@ -207,9 +207,11 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
constructor(ipv4: CValue<sockaddr_in>) : super(ipv4) { constructor(ipv4: CValue<sockaddr_in>) : super(ipv4) {
safeAutoClose2(onError = { super.close() }) { safeAutoClose2(onError = { super.close() }) {
this@EpollSocketEventLoop._socketsCount.tryIncref {
this.__metadata = _SocketMetadata(this._socketFd) this.__metadata = _SocketMetadata(this._socketFd)
} }
} }
}
constructor(ipv6: CValue<sockaddr_in6>) : super(ipv6) { constructor(ipv6: CValue<sockaddr_in6>) : super(ipv6) {
safeAutoClose2(onError = { super.close() }) { safeAutoClose2(onError = { super.close() }) {

View File

@ -34,8 +34,8 @@ import platform.posix.recv as recv_lowlevel
import platform.posix.close as close_lowlevel import platform.posix.close as close_lowlevel
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.CUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.CUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2 import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2
import ru.landrafhomyak.utility.reference_counter.CloseableReferenceCounter
@Suppress("CanBeVal") @Suppress("CanBeVal")
@OptIn(ExperimentalForeignApi::class) @OptIn(ExperimentalForeignApi::class)
@ -43,7 +43,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
protected val _socketFd: Int protected val _socketFd: Int
private val __rSync: CMutex private val __rSync: CMutex
private val __wSync: CMutex private val __wSync: CMutex
private val __refcnt: CloseableRefCounter private val __refcnt: CloseableReferenceCounter
private val __parentProtocol: ParentProtocol private val __parentProtocol: ParentProtocol
private enum class ParentProtocol { private enum class ParentProtocol {
@ -61,7 +61,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
this.__rSync = CMutex() this.__rSync = CMutex()
this.__wSync = CMutex() this.__wSync = CMutex()
this.__parentProtocol = ParentProtocol.IPv4 this.__parentProtocol = ParentProtocol.IPv4
this.__refcnt = CloseableRefCounter("ICMP socket was closed") this.__refcnt = CloseableReferenceCounter("ICMP socket was closed")
this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI<sockaddr_in>(), "IPv4") this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI<sockaddr_in>(), "IPv4")
} }
@ -69,7 +69,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
this.__rSync = CMutex() this.__rSync = CMutex()
this.__wSync = CMutex() this.__wSync = CMutex()
this.__parentProtocol = ParentProtocol.IPv6 this.__parentProtocol = ParentProtocol.IPv6
this.__refcnt = CloseableRefCounter("ICMP socket was closed") this.__refcnt = CloseableReferenceCounter("ICMP socket was closed")
this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI<sockaddr_in6>(), "IPv6") this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI<sockaddr_in6>(), "IPv6")
} }

View File

@ -23,6 +23,7 @@ kotlin {
dependencies { dependencies {
implementation(Dependencies.kotlin_atomicfu) implementation(Dependencies.kotlin_atomicfu)
api("ru.landgrafhomyak.utility:highlevel-try-finally:0.4") api("ru.landgrafhomyak.utility:highlevel-try-finally:0.4")
api("ru.landgrafhomyak.utility:reference-counter:0.1")
} }
} }
} }

View File

@ -1,72 +0,0 @@
@file:OptIn(ExperimentalContracts::class)
package ru.landgrafhomyak.bgtu.networks0.utilities
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose1
import ru.landgrafhomyak.utility.highlevel_try_finally.safeAutoClose2
class CloseableRefCounter(private val _errMessage: String) {
private val _value: AtomicLong = atomic(0L)
fun throwClosed() {
throw IllegalStateException(this._errMessage)
}
fun incref() {
this._value.update { o ->
if (o < 0) this.throwClosed()
return@update o + 1
}
}
inline fun <R> tryIncref(block: () -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
this.incref()
return safeAutoClose2(onError = this::decref, action = block)
}
fun checkNotClosed() {
if (this._value.value < 0) this.throwClosed()
}
fun decref() {
this._value.update(Long::dec)
}
inline fun <R> tryDecref(block: () -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
this.checkNotClosed()
return safeAutoClose2(onSuccess = this::decref, action = block)
}
fun close(errExistRefs: String) {
val state = this._value.compareAndExchange(0, -1)
when {
state > 0 -> throw IllegalStateException(errExistRefs)
state < 0 -> this.throwClosed()
}
}
inline fun <R> withRef(protected: () -> R): R {
this.incref()
return safeAutoClose1(finally = this::decref, action = protected)
}
override fun toString(): String {
val refcntCached = this._value.value
if (refcntCached < 0)
return "<ref counter [closed]>"
else
return "<ref counter [${refcntCached}]>"
}
}

View File

@ -25,6 +25,7 @@ pluginManagement {
includeBuild("./libs/int-serializers") includeBuild("./libs/int-serializers")
includeBuild("./libs/highlevel-try-finally") includeBuild("./libs/highlevel-try-finally")
includeBuild("./libs/reference-counter")
include(":modules:utilities") include(":modules:utilities")
include(":modules:low-level:c-interop-utilities") include(":modules:low-level:c-interop-utilities")
include(":modules:low-level:multithreading") include(":modules:low-level:multithreading")