From 7e2c6d0e43ad60e6090a195d90299f51274e7d6d Mon Sep 17 00:00:00 2001 From: Andrew Golovashevich Date: Wed, 19 Mar 2025 19:52:36 +0300 Subject: [PATCH] Minimal working parallel ping implementation --- .../low-level/multithreading/build.gradle.kts | 1 + .../multithreading/CountDownLatch.kt | 8 +- .../low_level/multithreading/extensions.kt | 1 + .../low_level/multithreading/Condition.kt | 7 +- .../low_level/multithreading/Mutex.kt | 7 +- .../low_level/multithreading/Thread.kt | 4 + modules/low-level/sockets/build.gradle.kts | 6 +- .../networks0/low_level/sockets/IcmpSocket.kt | 21 +- .../sockets/SocketBlockingEventLoop.kt | 9 - .../low_level/sockets/SocketEventLoop.kt | 7 - .../low_level/sockets/SocketEventLoopScope.kt | 27 +++ .../sockets/utility/CloseGuardEventLoop.kt | 67 ++++++ .../low_level/sockets/EpollSocketEventLoop.kt | 198 ++++++++++++++++++ .../sockets/SocketBlockingEventLoop.kt | 158 -------------- .../sockets/NonblockingIcmpSocketImpl.kt | 45 ++-- modules/utilities/build.gradle.kts | 28 +++ .../utilities/CloseableRefCounter.kt} | 4 +- .../bgtu/networks0/utilities/atomics.kt | 20 ++ .../bgtu/networks0/utilities/auto_close.kt} | 35 +--- programs/test/build.gradle.kts | 1 + programs/test/src/linuxX64Main/kotlin/main.kt | 78 ++++--- settings.gradle.kts | 1 + 22 files changed, 481 insertions(+), 252 deletions(-) delete mode 100644 modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketBlockingEventLoop.kt delete mode 100644 modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketEventLoop.kt create mode 100644 modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketEventLoopScope.kt create mode 100644 modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/utility/CloseGuardEventLoop.kt create mode 100644 modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt delete mode 100644 modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketBlockingEventLoop.kt create mode 100644 modules/utilities/build.gradle.kts rename modules/{low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt => utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/CloseableRefCounter.kt} (90%) create mode 100644 modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/atomics.kt rename modules/{low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt => utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/auto_close.kt} (74%) diff --git a/modules/low-level/multithreading/build.gradle.kts b/modules/low-level/multithreading/build.gradle.kts index cd93e2e..4c6fd97 100644 --- a/modules/low-level/multithreading/build.gradle.kts +++ b/modules/low-level/multithreading/build.gradle.kts @@ -25,6 +25,7 @@ kotlin { commonMain { dependencies { implementation(Dependencies.kotlin_atomicfu) + implementation(project(":modules:utilities")) } } diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt index 6fc6d3f..4316956 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/CountDownLatch.kt @@ -1,13 +1,17 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading +import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2 + class CountDownLatch : AutoCloseable { - private val _refcnt: _Refcnt + private val _refcnt: CloseableRefCounter private var _counter: ULong private val _mutex: Mutex private val _condition: Condition constructor(initialCounterValue: ULong) { - this._refcnt = _Refcnt("Latch was destroyed") + this._refcnt = CloseableRefCounter("Latch was destroyed") this._counter = initialCounterValue this._mutex = Mutex() _safeAutoClose2(onAbort = this._mutex::close) { diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt index e8b5293..7dfc537 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt +++ b/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/extensions.kt @@ -3,6 +3,7 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind import kotlin.contracts.contract +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 @OptIn(ExperimentalContracts::class) inline fun Mutex.withLock(synchronizedBlock: () -> R): R { diff --git a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt index 1f925b7..21e3382 100644 --- a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt +++ b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Condition.kt @@ -13,15 +13,18 @@ import platform.posix.pthread_cond_wait import platform.posix.pthread_cond_broadcast import platform.posix.pthread_cond_signal import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities +import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2 @OptIn(ExperimentalForeignApi::class) actual class Condition : AutoCloseable { - private val _refcnt: _Refcnt + private val _refcnt: CloseableRefCounter val _descriptor: CPointer actual constructor() { - this._refcnt = _Refcnt("Pthreads condition was destroyed") + this._refcnt = CloseableRefCounter("Pthreads condition was destroyed") this._descriptor = nativeHeap.alloc().ptr _safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) { var err = pthread_cond_init(this._descriptor, null) diff --git a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt index 43ce126..4ca04c8 100644 --- a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt +++ b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Mutex.kt @@ -12,6 +12,9 @@ import platform.posix.pthread_mutex_lock import platform.posix.pthread_mutex_t import platform.posix.pthread_mutex_unlock import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities +import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2 @OptIn(ExperimentalForeignApi::class, Mutex.RefcntAccess::class) actual class Mutex : AutoCloseable { @@ -19,11 +22,11 @@ actual class Mutex : AutoCloseable { internal annotation class RefcntAccess @RefcntAccess - internal val _refcnt: _Refcnt + internal val _refcnt: CloseableRefCounter internal val _descriptor: CPointer actual constructor() { - this._refcnt = _Refcnt("Pthreads mutex was destroyed") + this._refcnt = CloseableRefCounter("Pthreads mutex was destroyed") this._descriptor = nativeHeap.alloc().ptr _safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) { var err = pthread_mutex_init(this._descriptor, null) diff --git a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt index 19705d7..a361d90 100644 --- a/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt +++ b/modules/low-level/multithreading/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/Thread.kt @@ -9,6 +9,10 @@ import kotlinx.cinterop.StableRef import kotlinx.cinterop.asStableRef import kotlinx.cinterop.staticCFunction import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2 +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2e +import ru.landgrafhomyak.bgtu.networks0.utilities.compareAndExchange @OptIn(ExperimentalForeignApi::class) actual class Thread : AutoCloseable { diff --git a/modules/low-level/sockets/build.gradle.kts b/modules/low-level/sockets/build.gradle.kts index 5638980..4e0f62b 100644 --- a/modules/low-level/sockets/build.gradle.kts +++ b/modules/low-level/sockets/build.gradle.kts @@ -1,4 +1,5 @@ import ru.landgrafhomyak.bgtu.networks0.build_script.Dependencies +import ru.landgrafhomyak.bgtu.networks0.build_script.configureWarnings import ru.landgrafhomyak.bgtu.networks0.build_script.setupHierarchy plugins { @@ -11,6 +12,8 @@ repositories { kotlin { + configureWarnings() + mingwX64() linuxX64() linuxArm64() @@ -22,13 +25,14 @@ kotlin { commonMain { dependencies { implementation(Dependencies.kotlin_atomicfu) + implementation(Dependencies.kotlin_coroutines_core) + implementation(project(":modules:low-level:multithreading")) } } nativeMain { dependencies { implementation(project(":modules:low-level:c-interop-utilities")) - implementation(project(":modules:low-level:multithreading")) } } } diff --git a/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/IcmpSocket.kt b/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/IcmpSocket.kt index 1f38a84..e59d209 100644 --- a/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/IcmpSocket.kt +++ b/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/IcmpSocket.kt @@ -1,6 +1,23 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.sockets +/** + * High-level interface for interacting with icmp socket. + * Destination host address is bound to instance and can't be changed. + */ interface IcmpSocket : AutoCloseable { - suspend fun send(data: UByteArray) - suspend fun recv(): UByteArray + /** + * Sends [data] as is in single datagram to destination host. + */ + suspend fun sendRaw(data: UByteArray) + + /** + * Waits for datagram from destination host and returns it as is. + */ + suspend fun recvRaw(): UByteArray + + /** + * Closes socket and releases related resources. + * @throws IllegalStateException if called on already closed socket. + */ + override fun close() } \ No newline at end of file diff --git a/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketBlockingEventLoop.kt b/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketBlockingEventLoop.kt deleted file mode 100644 index 6a13d74..0000000 --- a/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketBlockingEventLoop.kt +++ /dev/null @@ -1,9 +0,0 @@ -package ru.landgrafhomyak.bgtu.networks0.low_level.sockets - -expect class SocketBlockingEventLoop : ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketEventLoop { - public constructor() - - companion object { - fun runForever(el: ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketBlockingEventLoop) - } -} \ No newline at end of file diff --git a/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketEventLoop.kt b/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketEventLoop.kt deleted file mode 100644 index cb84cf9..0000000 --- a/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketEventLoop.kt +++ /dev/null @@ -1,7 +0,0 @@ -package ru.landgrafhomyak.bgtu.networks0.low_level.sockets - -@Suppress("FunctionName") -interface SocketEventLoop { - fun icmp_IPv4(addr: String): ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket - fun icmp_IPv6(addr: String): ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket -} \ No newline at end of file diff --git a/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketEventLoopScope.kt b/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketEventLoopScope.kt new file mode 100644 index 0000000..0dc016b --- /dev/null +++ b/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketEventLoopScope.kt @@ -0,0 +1,27 @@ +package ru.landgrafhomyak.bgtu.networks0.low_level.sockets + +/** + * User-side interface of socket event-loops. + */ +@Suppress("FunctionName") +interface SocketEventLoopScope { + /** + * Creates [ICMP socket][IcmpSocket] bound with specified [address][addr] to communicate over IPv4. + */ + fun icmp_IPv4(addr: String): IcmpSocket + + /** + * Creates [ICMP socket][IcmpSocket] bound with specified [address][addr] to communicate over IPv6. + */ + fun icmp_IPv6(addr: String): IcmpSocket + + interface Closeable : SocketEventLoopScope, AutoCloseable { + /** + * Closes event-loop and releases related resources. + * + * @throws IllegalStateException if there are unclosed sockets bound to this event-loop. + * @throws IllegalStateException if called on already closed socket. + */ + override fun close() + } +} \ No newline at end of file diff --git a/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/utility/CloseGuardEventLoop.kt b/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/utility/CloseGuardEventLoop.kt new file mode 100644 index 0000000..590f589 --- /dev/null +++ b/modules/low-level/sockets/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/utility/CloseGuardEventLoop.kt @@ -0,0 +1,67 @@ +package ru.landgrafhomyak.bgtu.networks0.low_level.sockets.utility + +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.update +import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket +import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketEventLoopScope + +@Suppress("FunctionName", "MemberVisibilityCanBePrivate", "PropertyName") +class CloseGuardEventLoop( + val _unprotected: T, +) : SocketEventLoopScope.Closeable { + + private var _isClosed = atomic(false) + private var _openedSockets = atomic(0uL) + + private fun _assertNotClosed() { + if (this._isClosed.value) throw IllegalStateException("Event-loop is closed") + } + + private fun _incrementOpenedSockets() = this._openedSockets.update { x -> x + 1u } + private fun _decrementOpenedSockets() = this._openedSockets.update { x -> x - 1u } + + override fun icmp_IPv4(addr: String): IcmpSocket { + this._assertNotClosed() + this._incrementOpenedSockets() + return ProtectedIcmpSocket(this._unprotected.icmp_IPv4(addr)) + } + + override fun icmp_IPv6(addr: String): IcmpSocket { + this._assertNotClosed() + this._incrementOpenedSockets() + return ProtectedIcmpSocket(this._unprotected.icmp_IPv6(addr)) + } + + override fun close() { + this._assertNotClosed() + if (this._openedSockets.value > 0uL) + throw IllegalStateException("There are still opened sockets, close them manually first") + this._unprotected.close() + } + + private inner class ProtectedIcmpSocket(private val _unprotected: IcmpSocket) : IcmpSocket { + private val _isClosed = atomic(false) + + private fun _assertNotClosed() { + if (this._isClosed.value) throw IllegalStateException("Socket is closed") + } + + override suspend fun sendRaw(data: UByteArray) { + this._assertNotClosed() + this._unprotected.sendRaw(data) + } + + override suspend fun recvRaw(): UByteArray { + this._assertNotClosed() + return this._unprotected.recvRaw() + } + + override fun close() { + if (this._isClosed.getAndSet(true)) + throw IllegalStateException("Socket is closed") + this._unprotected.close() + this@CloseGuardEventLoop._decrementOpenedSockets() + } + + } +} \ No newline at end of file diff --git a/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt b/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt new file mode 100644 index 0000000..c540e68 --- /dev/null +++ b/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt @@ -0,0 +1,198 @@ +package ru.landgrafhomyak.bgtu.networks0.low_level.sockets + +import kotlin.coroutines.Continuation +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine +import kotlinx.cinterop.CValue +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.StableRef +import kotlinx.cinterop.allocArray +import kotlinx.cinterop.asStableRef +import kotlinx.cinterop.cValue +import kotlinx.cinterop.get +import kotlinx.cinterop.memScoped +import kotlinx.cinterop.ptr +import platform.linux.EPOLLIN +import platform.linux.EPOLLONESHOT +import platform.linux.EPOLLOUT +import platform.linux.EPOLL_CTL_ADD +import platform.linux.EPOLL_CTL_MOD +import platform.linux.epoll_create +import platform.linux.epoll_ctl +import platform.linux.epoll_event +import platform.linux.epoll_wait +import platform.linux.inet_pton +import platform.posix.AF_INET +import platform.posix.sockaddr_in +import platform.posix.sockaddr_in6 +import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities +import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Mutex as TMutex +import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2 + +@OptIn(ExperimentalForeignApi::class) +@Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName") +class EpollSocketEventLoop : SocketEventLoopScope.Closeable { + private val _epollDescriptor: Int + + constructor() { + this._epollDescriptor = epoll_create(1) + if (this._epollDescriptor < 0) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to create epoll descriptor: $d") } + } + + private fun _parseIpv4(rawAddr: String, port: UShort): CValue = cValue out@{ + memScoped { + val err = inet_pton(AF_INET, rawAddr, this@out.sin_addr.ptr) + when { + err == 0 -> throw IllegalArgumentException("IPv4 address is ill-formatted") + err < 0 -> PosixUtilities.throwErrno { d -> throw RuntimeException("Failed to parse IPv4 address: $d") } + } + } + this@out.sin_family = AF_INET.toUShort() + this@out.sin_port = port + } + + override fun icmp_IPv4(addr: String): IcmpSocket { + return BoundIcmpSocket(this._parseIpv4(addr, 0u)) + } + + override fun icmp_IPv6(addr: String): IcmpSocket { + TODO("Not yet implemented") + } + + private fun _waitFor(metadata: _SocketMetadata) { + val ee = cValue { + this.events = ((if (metadata.read == null) 0 else EPOLLIN) or (if (metadata.write == null) 0 else EPOLLOUT) or EPOLLONESHOT).toUInt() + this.data.fd = metadata.fd + this.data.ptr = metadata.stableRef.asCPointer() + } + if (!metadata.isAddedToEpoll) { + if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_ADD, metadata.fd, ee)) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to add fd to epoll: $d") } + metadata.isAddedToEpoll = true + } else { + if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_MOD, metadata.fd, ee)) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to modify fd in epoll: $d") } + } + } + + private suspend fun _waitForRead(metadata: _SocketMetadata) = suspendCoroutine { continuation -> + metadata.sync.withLock { + if (metadata.read != null) + throw IllegalStateException("Socket already waiting for read") + metadata.read = continuation + this._waitFor(metadata) + } + } + + + private suspend fun _waitForWrite(metadata: _SocketMetadata) = suspendCoroutine { continuation -> + metadata.sync.withLock { + if (metadata.write != null) + throw IllegalStateException("Socket already waiting for write") + metadata.write = continuation + this._waitFor(metadata) + } + } + + private fun _resume(ev: epoll_event) { + val metadata = (ev.data.ptr ?: throw RuntimeException("Lost reference to socket metadata")).asStableRef<_SocketMetadata>().get() + metadata.sync.withLock { + if ((ev.events and EPOLLIN.toUInt()) != 0u) { + (metadata.read ?: throw RuntimeException("Socket ready for read, but nobody requests")).resume(Unit) + metadata.read = null + } + if ((ev.events and EPOLLOUT.toUInt()) != 0u) { + (metadata.write ?: throw RuntimeException("Socket ready for read, but nobody requests")).resume(Unit) + metadata.write = null + } + if (metadata.read != null || metadata.write != null) + this._waitFor(metadata) + } + } + + fun runUntilError(eventsBufferSize: Int = 1000) { + memScoped { + val readyEvents = allocArray(eventsBufferSize) + while (true) { + val readyEventsCount = epoll_wait(this@EpollSocketEventLoop._epollDescriptor, readyEvents, eventsBufferSize, -1) + if (readyEventsCount < 0) + PosixUtilities.throwErrno { d -> RuntimeException("Error while waiting epoll: $d") } + + for (i in 0..? + var write: Continuation? + val sync: TMutex + val stableRef: StableRef<_SocketMetadata> + var isAddedToEpoll: Boolean + + constructor(fd: Int) { + this.isAddedToEpoll = false + this.fd = fd + this.read = null + this.write = null + this.stableRef = StableRef.create(this) + _safeAutoClose2(onAbort = this.stableRef::dispose) { + this.sync = TMutex() + } + } + + override fun close() { + _safeAutoClose1( + action = this.sync::close, + finally = this.stableRef::dispose + ) + } + } + + + private inner class BoundIcmpSocket : NonblockingIcmpSocketImpl { + private val __metadata: _SocketMetadata + + constructor(ipv4: CValue) : super(ipv4) { + _safeAutoClose2(onAbort = { super.close() }) { + this.__metadata = _SocketMetadata(this._socketFd) + } + } + + constructor(ipv6: CValue) : super(ipv6) { + _safeAutoClose2(onAbort = { super.close() }) { + this.__metadata = _SocketMetadata(this._socketFd) + } + } + + + override suspend fun _waitForWrite() = + this@EpollSocketEventLoop._waitForWrite(this.__metadata) + + override suspend fun _waitForRead() = + this@EpollSocketEventLoop._waitForRead(this.__metadata) + + override fun close() { + _safeAutoClose1( + action = {}, + finally = { + _safeAutoClose1( + action = this.__metadata::close, + finally = { super.close() } + ) + } + ) + } + } +} diff --git a/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketBlockingEventLoop.kt b/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketBlockingEventLoop.kt deleted file mode 100644 index 3ff92dc..0000000 --- a/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/SocketBlockingEventLoop.kt +++ /dev/null @@ -1,158 +0,0 @@ -package ru.landgrafhomyak.bgtu.networks0.low_level.sockets - -import kotlin.concurrent.AtomicReference -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine -import kotlinx.atomicfu.atomic -import kotlinx.cinterop.CValue -import kotlinx.cinterop.ExperimentalForeignApi -import kotlinx.cinterop.allocArray -import kotlinx.cinterop.cValue -import kotlinx.cinterop.get -import kotlinx.cinterop.memScoped -import kotlinx.cinterop.ptr -import platform.linux.EPOLLIN -import platform.linux.EPOLLOUT -import platform.linux.EPOLL_CTL_ADD -import platform.linux.epoll_create -import platform.linux.epoll_ctl -import platform.linux.epoll_event -import platform.linux.epoll_wait -import platform.linux.inet_pton -import platform.posix.AF_INET -import platform.posix.sockaddr_in -import platform.posix.sockaddr_in6 -import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities -import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.TMutex -import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock - -@OptIn(ExperimentalForeignApi::class) -@Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName") -actual class SocketBlockingEventLoop : SocketEventLoop { - private val _tsync = TMutex() - private val _id2cont = HashMap>() - private var _nextSockId: ULong = 0uL - private val _epollDescriptor: Int - - actual constructor() { - this._epollDescriptor = epoll_create(0) - if (this._epollDescriptor < 0) - PosixUtilities.throwErrno { d -> RuntimeException("Failed to create epoll descriptor: $d") } - } - - - private val _state = AtomicReference(State.READY) - private val _openedSockets = atomic(0uL) - - private enum class State { - READY, RUNNING, CLOSED - } - - private fun _assertNotClosed() { - if (this._state.value == State.CLOSED) - throw IllegalStateException("Eventloop is closed") - } - - - private fun _parseIpv4(rawAddr: String, port: UShort): CValue = cValue out@{ - memScoped { - val err = inet_pton(AF_INET, rawAddr, this@out.sin_addr.ptr) - when { - err == 0 -> throw IllegalArgumentException("IPv4 address is ill-formatted") - err < 0 -> PosixUtilities.throwErrno { d -> throw RuntimeException("Failed to parse IPv4 address: $d") } - } - } - this@out.sin_family = AF_INET.toUShort() - this@out.sin_port = port - } - - - override fun icmp_IPv4(addr: String): IcmpSocket { - this._assertNotClosed() - return BoundIcmpSocket(this._parseIpv4(addr, 0u)) - } - - override fun icmp_IPv6(addr: String): IcmpSocket { - this._assertNotClosed() - TODO("Not yet implemented") - } - - private suspend fun _waitFor(fd: Int, action: Int) = suspendCoroutine { continuation -> - val contId: ULong - this._tsync.withLock { - contId = this._nextSockId++ - this._id2cont[contId] = continuation - } - - val ee = cValue { - this.events = action.toUInt() - this.data.fd = fd - this.data.u64 = contId - } - - if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_ADD, fd, ee)) - PosixUtilities.throwErrno { d -> RuntimeException("Failed to add event to epoll: $d") } - } - - private suspend fun _waitForRead(fd: Int) = - this._waitFor(fd, EPOLLIN) - - - private suspend fun _waitForWrite(fd: Int) = - this._waitFor(fd, EPOLLOUT) - - private fun _resume(id: ULong) { - val cont = this._tsync.withLock { this._id2cont.remove(id) } - if (cont == null) { - // todo - return - } - cont.resume(Unit) - } - - private fun _mainloop() { - when (this._state.compareAndExchange(State.READY, State.RUNNING)) { - State.RUNNING -> throw IllegalStateException("Eventloop already running") - State.CLOSED -> throw IllegalStateException("Eventloop is closed") - State.READY -> {} - } - - val eventsBufferSize = 100 - memScoped { - val readyEvents = allocArray(eventsBufferSize) - while (true) { - val readyEventsCount = epoll_wait(this@SocketBlockingEventLoop._epollDescriptor, readyEvents, eventsBufferSize, -1) - if (readyEventsCount < 0) - PosixUtilities.throwErrno { d -> RuntimeException("Error while waiting epoll: $d") } - - for (i in 0..) : super(ipv4) - - constructor(ipv6: CValue) : super(ipv6) - - override suspend fun _waitForWrite() = - this@SocketBlockingEventLoop._waitForWrite(this._socketFd) - - override suspend fun _waitForRead() = - this@SocketBlockingEventLoop._waitForRead(this._socketFd) - - override fun close() { - TODO("Not yet implemented") - } - - } -} \ No newline at end of file diff --git a/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt b/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt index f684189..b020ad6 100644 --- a/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt +++ b/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt @@ -1,14 +1,18 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.sockets +import kotlinx.atomicfu.locks.MutexPool import kotlinx.cinterop.CValue import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.addressOf import kotlinx.cinterop.memScoped import kotlinx.cinterop.usePinned +import kotlinx.coroutines.sync.Mutex as CMutex +import kotlinx.coroutines.sync.withLock import platform.posix.AF_INET import platform.posix.AF_INET6 import platform.posix.EAGAIN import platform.posix.EMSGSIZE +import platform.posix.EPERM import platform.posix.EWOULDBLOCK import platform.posix.F_SETFL import platform.posix.IPPROTO_ICMP @@ -31,32 +35,37 @@ import platform.posix.recv as recv_lowlevel import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.CUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities -@Suppress("CanBeVal", "DUPLICATE_LABEL_IN_WHEN") +@Suppress("CanBeVal") @OptIn(ExperimentalForeignApi::class) internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { protected val _socketFd: Int + private val _rSync = CMutex() + private val _wSync = CMutex() protected constructor(ipv4: CValue) { - this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI()) + this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI(), "IPv4") } protected constructor(ipv6: CValue) { - this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI()) + this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI(), "IPv6") } @Suppress("FunctionName") - private fun __createSocket(addrFamily: Int, protocol: Int, addrValue: CValue<*>, addrSize: UInt): Int { + private fun __createSocket(addrFamily: Int, protocol: Int, addrValue: CValue<*>, addrSize: UInt, protoName: String): Int { val sock = socket(addrFamily, SOCK_RAW, protocol) - if (sock < 0) - PosixUtilities.throwErrno { d -> RuntimeException("Failed to create ICMP socket over IPv4: $d") } - + if (sock < 0) { + if (errno == EPERM) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to create ICMP socket over ${protoName}: $d (maybe you need to run this program as superuser)") } + else + PosixUtilities.throwErrno { d -> RuntimeException("Failed to create ICMP socket over ${protoName}: $d") } + } var err = memScoped { @Suppress("UNCHECKED_CAST") return@memScoped connect(sock, addrValue as CValue, addrSize) } if (err != 0) - PosixUtilities.throwErrno { d -> RuntimeException("Failed to bind ICMP socket to provided IPv6 address: $d") } + PosixUtilities.throwErrno { d -> RuntimeException("Failed to bind ICMP socket to provided $protoName address: $d") } err = fcntl(sock, F_SETFL, O_NONBLOCK) if (err != 0) @@ -72,23 +81,25 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { @Suppress("FunctionName") protected abstract suspend fun _waitForRead() - override suspend fun send(data: UByteArray) { + override suspend fun sendRaw(data: UByteArray) = this._wSync.withLock { polling@ while (true) { this._waitForWrite() val sentCount = data.usePinned { pinnedData -> return@usePinned send_lowlevel(this._socketFd, pinnedData.addressOf(0), data.size.toULong(), MSG_NOSIGNAL) } + val err = errno if (sentCount < 0) { - when (errno) { + when (err) { EAGAIN, EWOULDBLOCK -> continue@polling EMSGSIZE -> throw IllegalArgumentException("Data size bigger than can be sent by one packet") - else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message ove ICMP socket: $d") } + else -> PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to send message ove ICMP socket: $d") } } } + return } } - override suspend fun recv(): UByteArray { + override suspend fun recvRaw(): UByteArray = this._rSync.withLock { polling@ while (true) { this._waitForRead() val bytesAvailable: ssize_t = recv_lowlevel( @@ -99,7 +110,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { if (bytesAvailable < 0) { when (errno) { EAGAIN, EWOULDBLOCK -> continue@polling - else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message ove ICMP socket: $d") } + else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") } } } @@ -110,9 +121,15 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { if (sentCount < 0) { when (errno) { EAGAIN, EWOULDBLOCK -> continue@polling - else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message ove ICMP socket: $d") } + else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") } } } + return@withLock data } + return@withLock ubyteArrayOf() + } + + override fun close() { + } } \ No newline at end of file diff --git a/modules/utilities/build.gradle.kts b/modules/utilities/build.gradle.kts new file mode 100644 index 0000000..56571a6 --- /dev/null +++ b/modules/utilities/build.gradle.kts @@ -0,0 +1,28 @@ +import ru.landgrafhomyak.bgtu.networks0.build_script.Dependencies +import ru.landgrafhomyak.bgtu.networks0.build_script.configureWarnings +import ru.landgrafhomyak.bgtu.networks0.build_script.setupHierarchy +import ru.landgrafhomyak.kotlin.kmp_gradle_build_helper.defineAllMultiplatformTargets + +plugins { + kotlin("multiplatform") +} + +repositories { + mavenCentral() +} + + +kotlin { + configureWarnings() + + defineAllMultiplatformTargets() + sourceSets { + setupHierarchy() + + commonMain { + dependencies { + implementation(Dependencies.kotlin_atomicfu) + } + } + } +} \ No newline at end of file diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt b/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/CloseableRefCounter.kt similarity index 90% rename from modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt rename to modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/CloseableRefCounter.kt index c62a70d..216c408 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/_Refcnt.kt +++ b/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/CloseableRefCounter.kt @@ -1,10 +1,10 @@ -package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading +package ru.landgrafhomyak.bgtu.networks0.utilities import kotlinx.atomicfu.AtomicLong import kotlinx.atomicfu.atomic import kotlinx.atomicfu.update -internal class _Refcnt(private val _errMessage: String) { +class CloseableRefCounter(private val _errMessage: String) { private val _value: AtomicLong = atomic(0L) fun throwClosed() { diff --git a/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/atomics.kt b/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/atomics.kt new file mode 100644 index 0000000..54bf856 --- /dev/null +++ b/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/atomics.kt @@ -0,0 +1,20 @@ +package ru.landgrafhomyak.bgtu.networks0.utilities + +import kotlinx.atomicfu.AtomicLong +import kotlinx.atomicfu.AtomicRef + +fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long { + while (true) { + val old = this.value + if (old != expected) return old + if (this.compareAndSet(old, newValue)) return old + } +} + +fun AtomicRef.compareAndExchange(expected: T, newValue: T): T { + while (true) { + val old = this.value + if (old === expected) return old + if (this.compareAndSet(old, newValue)) return old + } +} diff --git a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt b/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/auto_close.kt similarity index 74% rename from modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt rename to modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/auto_close.kt index 1b0a0e8..66e643f 100644 --- a/modules/low-level/multithreading/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/multithreading/utilities.kt +++ b/modules/utilities/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/utilities/auto_close.kt @@ -1,32 +1,13 @@ @file:OptIn(ExperimentalContracts::class) -package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading +package ru.landgrafhomyak.bgtu.networks0.utilities import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind import kotlin.contracts.contract -import kotlinx.atomicfu.AtomicLong -import kotlinx.atomicfu.AtomicRef - -internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long { - while (true) { - val old = this.value - if (old != expected) return old - if (this.compareAndSet(old, newValue)) return old - } -} - -internal fun AtomicRef.compareAndExchange(expected: T, newValue: T): T { - while (true) { - val old = this.value - if (old === expected) return old - if (this.compareAndSet(old, newValue)) return old - } -} @Suppress("WRONG_INVOCATION_KIND") -@PublishedApi -internal inline fun _safeAutoClose1( +inline fun _safeAutoClose1( finally: () -> Unit, action: () -> R ): R { @@ -38,8 +19,7 @@ internal inline fun _safeAutoClose1( } @Suppress("WRONG_INVOCATION_KIND") -@PublishedApi -internal inline fun _safeAutoClose2( +inline fun _safeAutoClose2( onAbort: () -> Unit = {}, onSuccess: () -> Unit = {}, action: () -> R @@ -53,8 +33,7 @@ internal inline fun _safeAutoClose2( } @Suppress("WRONG_INVOCATION_KIND") -@PublishedApi -internal inline fun _safeAutoClose2e( +inline fun _safeAutoClose2e( onAbort: (Throwable) -> Unit = { _ -> }, onSuccess: () -> Unit = {}, action: () -> R @@ -67,8 +46,7 @@ internal inline fun _safeAutoClose2e( return _safeAutoClose3e(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) } -@PublishedApi -internal inline fun _safeAutoClose3( +inline fun _safeAutoClose3( onAbort: () -> Unit = {}, onSuccess: () -> Unit = {}, onCrossReturn: () -> Unit = {}, @@ -83,8 +61,7 @@ internal inline fun _safeAutoClose3( return _safeAutoClose3e(onAbort = { t -> onAbort() }, onSuccess = onSuccess, onCrossReturn = onCrossReturn, action = action) } -@PublishedApi -internal inline fun _safeAutoClose3e( +inline fun _safeAutoClose3e( onAbort: (Throwable) -> Unit = { _ -> }, onSuccess: () -> Unit = {}, onCrossReturn: () -> Unit = {}, diff --git a/programs/test/build.gradle.kts b/programs/test/build.gradle.kts index f29cdaa..645050a 100644 --- a/programs/test/build.gradle.kts +++ b/programs/test/build.gradle.kts @@ -19,6 +19,7 @@ kotlin { linuxX64Main { dependencies { implementation(project(":modules:low-level:multithreading")) + implementation(project(":modules:low-level:sockets")) } } } diff --git a/programs/test/src/linuxX64Main/kotlin/main.kt b/programs/test/src/linuxX64Main/kotlin/main.kt index 1316cfc..472e82f 100644 --- a/programs/test/src/linuxX64Main/kotlin/main.kt +++ b/programs/test/src/linuxX64Main/kotlin/main.kt @@ -1,41 +1,71 @@ -import platform.posix.sleep +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Thread +import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.EpollSocketEventLoop -class Printer(private val prefix: String) : Thread.Routine { +class EventLoopRoutine(private val loop: EpollSocketEventLoop) : Thread.Routine { override fun run() { - for (i in 0..3) { - print("${this.prefix}: ${i}\n") - sleep(1u) + while (true) { + try { + this.loop.runUntilError() + } catch (e: Throwable) { + e.printStackTrace() + break // todo remove + } } } } fun main() { try { - println("entry") + val loop = EpollSocketEventLoop() - val t1 = Thread(Printer("t1")) - val t2 = Thread(Printer("t2")) - val t3 = Thread(Printer("t3")) - val t4 = Thread(Printer("t4")) + val loopThread = Thread(EventLoopRoutine(loop)) + loopThread.start() - println("created") - sleep(2u) - println("starting...") + val googleSock = loop.icmp_IPv4("8.8.8.8") + val yandexSock = loop.icmp_IPv4("5.255.255.70") + val phoneSock = loop.icmp_IPv4("192.168.43.1") + val pingData = ubyteArrayOf( + 0x08u, 0x00u, 0x4Cu, 0xE4u, 0x00u, 0x01u, 0x00u, 0x77u, + 0x61u, 0x62u, 0x63u, 0x64u, 0x65u, 0x66u, 0x67u, 0x68u, + 0x69u, 0x6Au, 0x6Bu, 0x6Cu, 0x6Du, 0x6Eu, 0x6Fu, 0x70u, + 0x71u, 0x72u, 0x73u, 0x74u, 0x75u, 0x76u, 0x77u, 0x61u, + 0x62u, 0x63u, 0x64u, 0x65u, 0x66u, 0x67u, 0x68u, 0x69u + ) - t1.start() - t2.start() - t3.start() - t4.start() + runBlocking { + launch { + while (true) { + googleSock.sendRaw(pingData) + print("sent to google\n") + googleSock.recvRaw() + print("got from google\n") + delay(1000) + } + } + launch { + while (true) { + yandexSock.sendRaw(pingData) + print("sent to yandex\n") + yandexSock.recvRaw() + print("got from yandex\n") + delay(2500) + } + } + launch { + while (true) { + phoneSock.sendRaw(pingData) + print("sent to phone\n") + phoneSock.recvRaw() + print("got from phone\n") + delay(5500) + } + } + } - print("started\n") - t1.join()?.let(Throwable::printStackTrace) - t2.join()?.let(Throwable::printStackTrace) - t3.join()?.let(Throwable::printStackTrace) - t4.join()?.let(Throwable::printStackTrace) - - println("finished") } catch (e: Throwable) { e.printStackTrace() } diff --git a/settings.gradle.kts b/settings.gradle.kts index ba6cafe..3822174 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -21,6 +21,7 @@ pluginManagement { } } } +include(":modules:utilities") include(":modules:low-level:c-interop-utilities") include(":modules:low-level:multithreading") include(":modules:low-level:sockets")