diff --git a/modules/low-level/multithreading/build.gradle.kts b/modules/low-level/multithreading/build.gradle.kts index 4c6fd97..0c220da 100644 --- a/modules/low-level/multithreading/build.gradle.kts +++ b/modules/low-level/multithreading/build.gradle.kts @@ -14,6 +14,8 @@ repositories { kotlin { configureWarnings() + jvm() + mingwX64() linuxX64() linuxArm64() diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt index 4316956..5d963d6 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt @@ -1,8 +1,8 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2 class CountDownLatch : AutoCloseable { private val _refcnt: CloseableRefCounter @@ -14,7 +14,7 @@ class CountDownLatch : AutoCloseable { this._refcnt = CloseableRefCounter("Latch was destroyed") this._counter = initialCounterValue this._mutex = Mutex() - _safeAutoClose2(onAbort = this._mutex::close) { + safeAutoClose2(onAbort = this._mutex::close) { this._condition = Condition() } } @@ -37,7 +37,7 @@ class CountDownLatch : AutoCloseable { override fun close() { this._refcnt.close("Latch is still in use") - _safeAutoClose1( + safeAutoClose1( action = this._condition::close, finally = this._mutex::close ) diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt index 7dfc537..f7d739d 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt @@ -3,7 +3,7 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind import kotlin.contracts.contract -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1 @OptIn(ExperimentalContracts::class) inline fun Mutex.withLock(synchronizedBlock: () -> R): R { @@ -11,5 +11,5 @@ inline fun Mutex.withLock(synchronizedBlock: () -> R): R { callsInPlace(synchronizedBlock, InvocationKind.EXACTLY_ONCE) } this.lock() - return _safeAutoClose1(finally = this::unlock, synchronizedBlock) + return safeAutoClose1(finally = this::unlock, synchronizedBlock) } \ No newline at end of file diff --git a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt index 21e3382..cff7cd3 100644 --- a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt +++ b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt @@ -14,8 +14,8 @@ import platform.posix.pthread_cond_broadcast import platform.posix.pthread_cond_signal import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2 @OptIn(ExperimentalForeignApi::class) @@ -26,7 +26,7 @@ actual class Condition : AutoCloseable { actual constructor() { this._refcnt = CloseableRefCounter("Pthreads condition was destroyed") this._descriptor = nativeHeap.alloc().ptr - _safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) { + safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) { var err = pthread_cond_init(this._descriptor, null) if (err != 0) PosixUtilities.throwErrno { d -> RuntimeException("Failed to initialize pthreads condition: $d") } @@ -56,7 +56,7 @@ actual class Condition : AutoCloseable { override fun close() { this._refcnt.close("There are waiting threads on this pthreads condition") - _safeAutoClose1( + safeAutoClose1( action = { var err = pthread_cond_destroy(this._descriptor) if (err != 0) diff --git a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt index 4ca04c8..7b162be 100644 --- a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt +++ b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt @@ -13,8 +13,8 @@ import platform.posix.pthread_mutex_t import platform.posix.pthread_mutex_unlock import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2 @OptIn(ExperimentalForeignApi::class, Mutex.RefcntAccess::class) actual class Mutex : AutoCloseable { @@ -28,7 +28,7 @@ actual class Mutex : AutoCloseable { actual constructor() { this._refcnt = CloseableRefCounter("Pthreads mutex was destroyed") this._descriptor = nativeHeap.alloc().ptr - _safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) { + safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) { var err = pthread_mutex_init(this._descriptor, null) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads mutex: $d") } @@ -53,7 +53,7 @@ actual class Mutex : AutoCloseable { override fun close() { this._refcnt.close("There are waiting threads on this pthreads mutex") - _safeAutoClose1( + safeAutoClose1( action = { var err = pthread_mutex_destroy(this._descriptor) if (err != 0) diff --git a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt index a361d90..39db155 100644 --- a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt +++ b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt @@ -9,9 +9,9 @@ import kotlinx.cinterop.StableRef import kotlinx.cinterop.asStableRef import kotlinx.cinterop.staticCFunction import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2 -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2e +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2e import ru.landgrafhomyak.bgtu.networks0.utilities.compareAndExchange @OptIn(ExperimentalForeignApi::class) @@ -38,9 +38,9 @@ actual class Thread : AutoCloseable { actual constructor(routine: Routine) { this._state = atomic(State.PENDING) this._bootstrapArgRef = StableRef.create(ThreadBootstrapContext(routine)) - _safeAutoClose2(onAbort = this._bootstrapArgRef::dispose) { + safeAutoClose2(onAbort = this._bootstrapArgRef::dispose) { this._threadBindings = _PthreadsThreadBindings() - _safeAutoClose2(onAbort = this._threadBindings::free) { + safeAutoClose2(onAbort = this._threadBindings::free) { var err = this._threadBindings.create( null, staticCFunction({ arg -> return@staticCFunction Thread._threadBootstrap(arg) }), @@ -60,7 +60,7 @@ actual class Thread : AutoCloseable { State.STARTING, State.RUNNING -> throw IllegalStateException("Pthreads thread already running") State.PENDING -> {} } - _safeAutoClose2(onAbort = { this._state.value = State.PENDING }) { + safeAutoClose2(onAbort = { this._state.value = State.PENDING }) { this._bootstrapArgRef.get().startSignal.decrement() } } @@ -87,14 +87,14 @@ actual class Thread : AutoCloseable { return@update State.CLOSED } - _safeAutoClose1( + safeAutoClose1( action = { var err = this._threadBindings.join(null) if (err != 0) PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads thread: $d") } }, finally = { - _safeAutoClose1( + safeAutoClose1( action = { this._threadBindings.free() }, finally = { this._bootstrapArgRef.dispose() } ) @@ -112,7 +112,7 @@ actual class Thread : AutoCloseable { return null } try { - _safeAutoClose2e( + safeAutoClose2e( onAbort = { e -> context.exitedWithError = e context.threadState.value = State.FINISHED diff --git a/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/utility/CloseGuardEventLoop.kt b/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/utility/CloseGuardEventLoop.kt deleted file mode 100644 index 590f589..0000000 --- a/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/utility/CloseGuardEventLoop.kt +++ /dev/null @@ -1,67 +0,0 @@ -package ru.landgrafhomyak.bgtu.networks0.low_level.sockets.utility - -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.update -import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket -import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketEventLoopScope - -@Suppress("FunctionName", "MemberVisibilityCanBePrivate", "PropertyName") -class CloseGuardEventLoop( - val _unprotected: T, -) : SocketEventLoopScope.Closeable { - - private var _isClosed = atomic(false) - private var _openedSockets = atomic(0uL) - - private fun _assertNotClosed() { - if (this._isClosed.value) throw IllegalStateException("Event-loop is closed") - } - - private fun _incrementOpenedSockets() = this._openedSockets.update { x -> x + 1u } - private fun _decrementOpenedSockets() = this._openedSockets.update { x -> x - 1u } - - override fun icmp_IPv4(addr: String): IcmpSocket { - this._assertNotClosed() - this._incrementOpenedSockets() - return ProtectedIcmpSocket(this._unprotected.icmp_IPv4(addr)) - } - - override fun icmp_IPv6(addr: String): IcmpSocket { - this._assertNotClosed() - this._incrementOpenedSockets() - return ProtectedIcmpSocket(this._unprotected.icmp_IPv6(addr)) - } - - override fun close() { - this._assertNotClosed() - if (this._openedSockets.value > 0uL) - throw IllegalStateException("There are still opened sockets, close them manually first") - this._unprotected.close() - } - - private inner class ProtectedIcmpSocket(private val _unprotected: IcmpSocket) : IcmpSocket { - private val _isClosed = atomic(false) - - private fun _assertNotClosed() { - if (this._isClosed.value) throw IllegalStateException("Socket is closed") - } - - override suspend fun sendRaw(data: UByteArray) { - this._assertNotClosed() - this._unprotected.sendRaw(data) - } - - override suspend fun recvRaw(): UByteArray { - this._assertNotClosed() - return this._unprotected.recvRaw() - } - - override fun close() { - if (this._isClosed.getAndSet(true)) - throw IllegalStateException("Socket is closed") - this._unprotected.close() - this@CloseGuardEventLoop._decrementOpenedSockets() - } - - } -} \ No newline at end of file diff --git a/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt b/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt index c540e68..811c2e5 100644 --- a/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt +++ b/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt @@ -16,6 +16,7 @@ import platform.linux.EPOLLIN import platform.linux.EPOLLONESHOT import platform.linux.EPOLLOUT import platform.linux.EPOLL_CTL_ADD +import platform.linux.EPOLL_CTL_DEL import platform.linux.EPOLL_CTL_MOD import platform.linux.epoll_create import platform.linux.epoll_ctl @@ -28,22 +29,25 @@ import platform.posix.sockaddr_in6 import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Mutex as TMutex import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 -import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2 +import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2 @OptIn(ExperimentalForeignApi::class) @Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName") class EpollSocketEventLoop : SocketEventLoopScope.Closeable { private val _epollDescriptor: Int + private val _socketsCount: CloseableRefCounter constructor() { + this._socketsCount = CloseableRefCounter("Epoll event-loop is closed") this._epollDescriptor = epoll_create(1) if (this._epollDescriptor < 0) PosixUtilities.throwErrno { d -> RuntimeException("Failed to create epoll descriptor: $d") } } private fun _parseIpv4(rawAddr: String, port: UShort): CValue = cValue out@{ - memScoped { + memScoped { val err = inet_pton(AF_INET, rawAddr, this@out.sin_addr.ptr) when { err == 0 -> throw IllegalArgumentException("IPv4 address is ill-formatted") @@ -78,6 +82,15 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable { } } + private fun _remove(metadata: _SocketMetadata) { + metadata.sync.withLock { + if (metadata.isAddedToEpoll) { + if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_DEL, metadata.fd, null)) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to remove fd from epoll: $d") } + } + } + } + private suspend fun _waitForRead(metadata: _SocketMetadata) = suspendCoroutine { continuation -> metadata.sync.withLock { if (metadata.read != null) @@ -147,13 +160,13 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable { this.read = null this.write = null this.stableRef = StableRef.create(this) - _safeAutoClose2(onAbort = this.stableRef::dispose) { + safeAutoClose2(onAbort = this.stableRef::dispose) { this.sync = TMutex() } } override fun close() { - _safeAutoClose1( + safeAutoClose1( action = this.sync::close, finally = this.stableRef::dispose ) @@ -165,14 +178,16 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable { private val __metadata: _SocketMetadata constructor(ipv4: CValue) : super(ipv4) { - _safeAutoClose2(onAbort = { super.close() }) { + safeAutoClose2(onAbort = { super.close() }) { this.__metadata = _SocketMetadata(this._socketFd) } } constructor(ipv6: CValue) : super(ipv6) { - _safeAutoClose2(onAbort = { super.close() }) { - this.__metadata = _SocketMetadata(this._socketFd) + safeAutoClose2(onAbort = { super.close() }) { + this@EpollSocketEventLoop._socketsCount.tryIncref { + this.__metadata = _SocketMetadata(this._socketFd) + } } } @@ -183,13 +198,18 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable { override suspend fun _waitForRead() = this@EpollSocketEventLoop._waitForRead(this.__metadata) - override fun close() { - _safeAutoClose1( - action = {}, + override fun _free() { + safeAutoClose1( + action = { this@EpollSocketEventLoop._remove(this.__metadata) }, finally = { - _safeAutoClose1( - action = this.__metadata::close, - finally = { super.close() } + safeAutoClose1( + action = this@EpollSocketEventLoop._socketsCount::decref, + finally = { + safeAutoClose1( + action = this.__metadata::close, + finally = { super.close() } + ) + } ) } ) diff --git a/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt b/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt index b020ad6..913548e 100644 --- a/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt +++ b/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt @@ -1,6 +1,5 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.sockets -import kotlinx.atomicfu.locks.MutexPool import kotlinx.cinterop.CValue import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.addressOf @@ -32,21 +31,31 @@ import platform.posix.socket import platform.posix.ssize_t import platform.posix.send as send_lowlevel import platform.posix.recv as recv_lowlevel +import platform.posix.close as close_lowlevel import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.CUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities +import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2 @Suppress("CanBeVal") @OptIn(ExperimentalForeignApi::class) internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { protected val _socketFd: Int - private val _rSync = CMutex() - private val _wSync = CMutex() + private val __rSync: CMutex + private val __wSync: CMutex + private val __refcnt: CloseableRefCounter protected constructor(ipv4: CValue) { + this.__rSync = CMutex() + this.__wSync = CMutex() + this.__refcnt = CloseableRefCounter("ICMP socket was closed") this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI(), "IPv4") } protected constructor(ipv6: CValue) { + this.__rSync = CMutex() + this.__wSync = CMutex() + this.__refcnt = CloseableRefCounter("ICMP socket was closed") this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI(), "IPv6") } @@ -59,18 +68,18 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { else PosixUtilities.throwErrno { d -> RuntimeException("Failed to create ICMP socket over ${protoName}: $d") } } + safeAutoClose2(onAbort = { if (0 != close_lowlevel(this._socketFd)) PosixUtilities.throwErrno { d -> RuntimeException("Failed to close ICMP socket: $d") } }) { + var err = memScoped { + @Suppress("UNCHECKED_CAST") + return@memScoped connect(sock, addrValue as CValue, addrSize) + } + if (err != 0) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to bind ICMP socket to provided $protoName address: $d") } - var err = memScoped { - @Suppress("UNCHECKED_CAST") - return@memScoped connect(sock, addrValue as CValue, addrSize) + err = fcntl(sock, F_SETFL, O_NONBLOCK) + if (err != 0) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to switch ICMP socket to non-blocking mode: $d") } } - if (err != 0) - PosixUtilities.throwErrno { d -> RuntimeException("Failed to bind ICMP socket to provided $protoName address: $d") } - - err = fcntl(sock, F_SETFL, O_NONBLOCK) - if (err != 0) - PosixUtilities.throwErrno { d -> RuntimeException("Failed to switch ICMP socket to non-blocking mode: $d") } - return sock } @@ -81,55 +90,71 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { @Suppress("FunctionName") protected abstract suspend fun _waitForRead() - override suspend fun sendRaw(data: UByteArray) = this._wSync.withLock { - polling@ while (true) { - this._waitForWrite() - val sentCount = data.usePinned { pinnedData -> - return@usePinned send_lowlevel(this._socketFd, pinnedData.addressOf(0), data.size.toULong(), MSG_NOSIGNAL) - } - val err = errno - if (sentCount < 0) { - when (err) { - EAGAIN, EWOULDBLOCK -> continue@polling - EMSGSIZE -> throw IllegalArgumentException("Data size bigger than can be sent by one packet") - else -> PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to send message ove ICMP socket: $d") } + override suspend fun sendRaw(data: UByteArray) { + this.__refcnt.withRef { + this.__wSync.withLock { + polling@ while (true) { + this._waitForWrite() + val sentCount = data.usePinned { pinnedData -> + return@usePinned send_lowlevel(this._socketFd, pinnedData.addressOf(0), data.size.toULong(), MSG_NOSIGNAL) + } + val err = errno + if (sentCount < 0) { + when (err) { + EAGAIN, EWOULDBLOCK -> continue@polling + EMSGSIZE -> throw IllegalArgumentException("Data size bigger than can be sent by one packet") + else -> PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to send message ove ICMP socket: $d") } + } + } + return } } - return } } - override suspend fun recvRaw(): UByteArray = this._rSync.withLock { - polling@ while (true) { - this._waitForRead() - val bytesAvailable: ssize_t = recv_lowlevel( - this._socketFd, null, 0u, - MSG_NOSIGNAL or MSG_PEEK or MSG_TRUNC - ) - if (bytesAvailable == 0L) continue@polling - if (bytesAvailable < 0) { - when (errno) { - EAGAIN, EWOULDBLOCK -> continue@polling - else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") } - } - } + override suspend fun recvRaw(): UByteArray { + this.__refcnt.withRef { + this.__rSync.withLock { + polling@ while (true) { + this._waitForRead() + val bytesAvailable: ssize_t = recv_lowlevel( + this._socketFd, null, 0u, + MSG_NOSIGNAL or MSG_PEEK or MSG_TRUNC + ) + if (bytesAvailable == 0L) continue@polling + if (bytesAvailable < 0) { + when (errno) { + EAGAIN, EWOULDBLOCK -> continue@polling + else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") } + } + } - val data = UByteArray(bytesAvailable.toInt()) - var sentCount = data.usePinned { pinnedData -> - return@usePinned recv_lowlevel(this._socketFd, pinnedData.addressOf(0), data.size.toULong(), MSG_NOSIGNAL) - } - if (sentCount < 0) { - when (errno) { - EAGAIN, EWOULDBLOCK -> continue@polling - else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") } + val data = UByteArray(bytesAvailable.toInt()) + var sentCount = data.usePinned { pinnedData -> + return@usePinned recv_lowlevel(this._socketFd, pinnedData.addressOf(0), data.size.toULong(), MSG_NOSIGNAL) + } + if (sentCount < 0) { + when (errno) { + EAGAIN, EWOULDBLOCK -> continue@polling + else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") } + } + } + return data } + @Suppress("KotlinUnreachableCode") + throw RuntimeException("unreachable") } - return@withLock data } - return@withLock ubyteArrayOf() + } + + protected open fun _free() { + var err = close_lowlevel(this._socketFd) + if (err != 0) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to close ICMP socket: $d") } } override fun close() { - + this.__refcnt.close("This socket still in use") + this._free() } } \ No newline at end of file diff --git a/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/CloseableRefCounter.kt b/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/CloseableRefCounter.kt index 216c408..91d10ea 100644 --- a/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/CloseableRefCounter.kt +++ b/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/CloseableRefCounter.kt @@ -1,5 +1,10 @@ +@file:OptIn(ExperimentalContracts::class) + package ru.landgrafhomyak.bgtu.networks0.utilities +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract import kotlinx.atomicfu.AtomicLong import kotlinx.atomicfu.atomic import kotlinx.atomicfu.update @@ -19,8 +24,11 @@ class CloseableRefCounter(private val _errMessage: String) { } inline fun tryIncref(block: () -> R): R { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) + } this.incref() - return _safeAutoClose2(onAbort = this::decref, action = block) + return safeAutoClose2(onAbort = this::decref, action = block) } fun checkNotClosed() { @@ -32,8 +40,11 @@ class CloseableRefCounter(private val _errMessage: String) { } inline fun tryDecref(block: () -> R): R { + contract { + callsInPlace(block, InvocationKind.EXACTLY_ONCE) + } this.checkNotClosed() - return _safeAutoClose2(onSuccess = this::decref, action = block) + return safeAutoClose2(onSuccess = this::decref, action = block) } fun close(errExistRefs: String) { @@ -46,7 +57,7 @@ class CloseableRefCounter(private val _errMessage: String) { inline fun withRef(protected: () -> R): R { this.incref() - return _safeAutoClose1(finally = this::decref, action = protected) + return safeAutoClose1(finally = this::decref, action = protected) } override fun toString(): String { diff --git a/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/auto_close.kt b/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/auto_close.kt index 66e643f..4b160f8 100644 --- a/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/auto_close.kt +++ b/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/auto_close.kt @@ -7,7 +7,7 @@ import kotlin.contracts.InvocationKind import kotlin.contracts.contract @Suppress("WRONG_INVOCATION_KIND") -inline fun _safeAutoClose1( +inline fun safeAutoClose1( finally: () -> Unit, action: () -> R ): R { @@ -15,11 +15,11 @@ inline fun _safeAutoClose1( callsInPlace(action, InvocationKind.EXACTLY_ONCE) callsInPlace(finally, InvocationKind.EXACTLY_ONCE) } - return _safeAutoClose3(onAbort = finally, onSuccess = finally, onCrossReturn = finally, action = action) + return safeAutoClose3(onAbort = finally, onSuccess = finally, onCrossReturn = finally, action = action) } @Suppress("WRONG_INVOCATION_KIND") -inline fun _safeAutoClose2( +inline fun safeAutoClose2( onAbort: () -> Unit = {}, onSuccess: () -> Unit = {}, action: () -> R @@ -29,11 +29,11 @@ inline fun _safeAutoClose2( callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE) callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE) } - return _safeAutoClose3(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) + return safeAutoClose3(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) } @Suppress("WRONG_INVOCATION_KIND") -inline fun _safeAutoClose2e( +inline fun safeAutoClose2e( onAbort: (Throwable) -> Unit = { _ -> }, onSuccess: () -> Unit = {}, action: () -> R @@ -43,10 +43,10 @@ inline fun _safeAutoClose2e( callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE) callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE) } - return _safeAutoClose3e(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) + return safeAutoClose3e(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) } -inline fun _safeAutoClose3( +inline fun safeAutoClose3( onAbort: () -> Unit = {}, onSuccess: () -> Unit = {}, onCrossReturn: () -> Unit = {}, @@ -58,10 +58,10 @@ inline fun _safeAutoClose3( callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE) callsInPlace(onCrossReturn, InvocationKind.AT_MOST_ONCE) } - return _safeAutoClose3e(onAbort = { t -> onAbort() }, onSuccess = onSuccess, onCrossReturn = onCrossReturn, action = action) + return safeAutoClose3e(onAbort = { t -> onAbort() }, onSuccess = onSuccess, onCrossReturn = onCrossReturn, action = action) } -inline fun _safeAutoClose3e( +inline fun safeAutoClose3e( onAbort: (Throwable) -> Unit = { _ -> }, onSuccess: () -> Unit = {}, onCrossReturn: () -> Unit = {},