diff --git a/modules/icmp/build.gradle.kts b/modules/icmp/build.gradle.kts index 4e7f291..9d0fcf6 100644 --- a/modules/icmp/build.gradle.kts +++ b/modules/icmp/build.gradle.kts @@ -17,9 +17,11 @@ kotlin { sourceSets { commonMain { dependencies { + implementation(Dependencies.kotlin_atomicfu) implementation(Dependencies.kotlin_coroutines_core) implementation(Dependencies.kotlin_datetime) implementation(Dependencies.int_serializers) + implementation(project(":modules:utilities")) implementation(project(":modules:low-level:sockets")) } } diff --git a/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/IcmpEchoHeader.kt b/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/IcmpEchoHeader.kt new file mode 100644 index 0000000..4347bf2 --- /dev/null +++ b/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/IcmpEchoHeader.kt @@ -0,0 +1,31 @@ +package ru.landgrafhomyak.bgtu.networks0.icmp + +import ru.landgrafhomyak.utility.IntSerializers + +@OptIn(ExperimentalUnsignedTypes::class) +class IcmpEchoHeader( + val identifier_be: UShort, + val seqNo_be: UShort, +) : IcmpHeader { + override val type: UByte get() = 0u + override val code: UByte get() = 0u + + override fun checksum(withData: UByteArray): UShort = Ipv4Checksum.calculate { c -> + c.addFirstByte(this.type) + c.addSecondByte(this.code) + c.addWord(this.identifier_be) + c.addWord(this.seqNo_be) + c.addTrailingData(withData) + } + + override fun serialize(withData: UByteArray): UByteArray { + val serialized = UByteArray(withData.size + 8) + IntSerializers.encode8Bu(serialized, 0, this.type) + IntSerializers.encode8Bu(serialized, 1, this.code) + IntSerializers.encode16beHu(serialized, 2, this.checksum(withData)) + IntSerializers.encode16beHu(serialized, 4, this.identifier_be) + IntSerializers.encode16beHu(serialized, 6, this.seqNo_be) + withData.copyInto(serialized, 8) + return serialized + } +} \ No newline at end of file diff --git a/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/PingSocket.kt b/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/PingSocket.kt deleted file mode 100644 index 30c1ceb..0000000 --- a/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/PingSocket.kt +++ /dev/null @@ -1,112 +0,0 @@ -package ru.landgrafhomyak.bgtu.networks0.icmp - -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.TimeoutCancellationException -import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withTimeout -import kotlinx.datetime.Clock -import kotlinx.datetime.Instant -import kotlinx.coroutines.sync.Mutex as CMutex -import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket -import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1 -import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2 -import ru.landgrafhomyak.utility.IntSerializers - - -@OptIn(ExperimentalUnsignedTypes::class) -class PingSocket : AutoCloseable { - private val _rawSocket: IcmpSocket - private val _mapSync: CMutex - private val _resumeMap: HashMap<ShortKeyPair, Continuation<Unit>> - private val _recvCoro: Job - - internal constructor(rawSocket: IcmpSocket) { - safeAutoClose2(onAbort = rawSocket::close) { - this._rawSocket = rawSocket - this._resumeMap = HashMap() - this._mapSync = CMutex() - this._recvCoro = CoroutineScope(Dispatchers.Default).launch { this@PingSocket._recvRoutine() } - } - } - - private class ShortKeyPair(val identifier: UShort, val seqNo: UShort) { - override fun equals(other: Any?): Boolean { - if (other !is ShortKeyPair) return false - return this.identifier == other.identifier && this.seqNo == other.seqNo - } - - override fun hashCode(): Int { - return ((this.identifier.toUInt() shl 16) or (this.seqNo).toUInt()).toInt() - } - - override fun toString(): String = "<icmp ping key pair identifier=${this.identifier} seq_no=${this.seqNo}>" - } - - private fun _generateKeyPair(): ShortKeyPair { - val timestamp = Clock.System.now() - return ShortKeyPair( - identifier = (timestamp.epochSeconds ushr 13).toULong().toUShort(), - seqNo = ((timestamp.epochSeconds shl 3) or ((((timestamp.nanosecondsOfSecond ushr 27) and 7).toLong())) and 0xFFFF).toULong().toUShort(), - ) - } - - suspend fun ping(timeoutMillis: UInt): UInt? { - var isLocked = true - val start: Instant - this._mapSync.lock() - safeAutoClose1(finally = { if (isLocked) this._mapSync.unlock() }) { - val key = this._generateKeyPair() - val header = IcmpPingHeader(identifier_be = key.identifier, seqNo_be = key.seqNo) - val serialized = header.serialize(PingSocket.randomData) - start = Clock.System.now() - this._rawSocket.sendRaw(serialized) - try { - withTimeout(timeoutMillis.toULong().toLong()) { - suspendCancellableCoroutine<Unit> { continuation -> - if (key in this@PingSocket._resumeMap) - throw RuntimeException("Key duplication (shouldn't happen)") - this@PingSocket._resumeMap[key] = continuation - this@PingSocket._mapSync.unlock() - isLocked = false - } - } - } catch (_: TimeoutCancellationException) { - return null - } - } - - val td = Clock.System.now() - start - return td.inWholeMilliseconds.toUInt() - } - - private suspend fun _recvRoutine() { - try { - // todo handle errors - while (true) { - val raw = this._rawSocket.recvRaw() - if (raw.size < 8) continue - val key = ShortKeyPair(identifier = IntSerializers.decode16beHu(raw, 4), seqNo = IntSerializers.decode16beHu(raw, 6)) - val continuation = this._mapSync.withLock { this._resumeMap.remove(key) } - if (continuation == null) continue - continuation.resume(Unit) - } - } catch (e: Throwable) { - e.printStackTrace() - } - } - - override fun close() { - TODO("Not yet implemented") - } - - companion object { - @Suppress("SpellCheckingInspection") - private val randomData = "abcdefghijklmnopqrstuvwabcdefghi".encodeToByteArray().asUByteArray() - } -} \ No newline at end of file diff --git a/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/Pinger1.kt b/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/Pinger1.kt new file mode 100644 index 0000000..89f6330 --- /dev/null +++ b/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/Pinger1.kt @@ -0,0 +1,107 @@ +package ru.landgrafhomyak.bgtu.networks0.icmp + +import kotlinx.atomicfu.AtomicRef +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.withTimeout +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket +import ru.landgrafhomyak.bgtu.networks0.utilities.compareAndExchange +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1 +import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2 +import ru.landgrafhomyak.utility.IntSerializers + + +@OptIn(ExperimentalUnsignedTypes::class) +class Pinger1 : AutoCloseable { + private enum class State { + READY, CONNECTED, CLOSED + } + + private val _rawSocket: IcmpSocket + private var _state: AtomicRef<State> + + + internal constructor(rawSocket: IcmpSocket) { + safeAutoClose2(onAbort = rawSocket::close) { + this._state = atomic(State.READY) + this._rawSocket = rawSocket + } + } + + private class ShortKeyPair(val identifier: UShort, val seqNo: UShort) { + override fun equals(other: Any?): Boolean { + if (other !is ShortKeyPair) return false + return this.identifier == other.identifier && this.seqNo == other.seqNo + } + + override fun hashCode(): Int { + return ((this.identifier.toUInt() shl 16) or (this.seqNo).toUInt()).toInt() + } + + override fun toString(): String = "<icmp ping key pair identifier=${this.identifier} seq_no=${this.seqNo}>" + } + + private fun _generateKeyPair(): ShortKeyPair { + val timestamp = Clock.System.now() + return ShortKeyPair( + identifier = (timestamp.epochSeconds ushr 13).toULong().toUShort(), + seqNo = ((timestamp.epochSeconds shl 3) or ((((timestamp.nanosecondsOfSecond ushr 27) and 7).toLong())) and 0xFFFF).toULong().toUShort(), + ) + } + + suspend fun ping(timeoutMillis: UInt): UInt? { + when (this._state.compareAndExchange(State.READY, State.CONNECTED)) { + State.CLOSED -> throw IllegalStateException("Pinger is closed") + State.CONNECTED -> throw IllegalStateException("Pinger is busy with another thread/coroutine") + State.READY -> {} + } + safeAutoClose1(finally = { this._state.value = State.READY }) { + val start: Instant + val key = this._generateKeyPair() + val header = IcmpPingHeader(identifier_be = key.identifier, seqNo_be = key.seqNo) + val serialized = header.serialize(Pinger1.payloadLikeInPingProgramm) + start = Clock.System.now() + this._rawSocket.sendRaw(serialized) + try { + withTimeout(timeoutMillis.toULong().toLong()) { + while (true) + if (this@Pinger1._dispatchResponse(key, this@Pinger1._rawSocket.recvRaw())) return@withTimeout + } + } catch (_: TimeoutCancellationException) { + return null + } + + val td = Clock.System.now() - start + return td.inWholeMilliseconds.toUInt() + } + } + + private fun _dispatchResponse(expected: ShortKeyPair, raw: UByteArray): Boolean { + if (raw.size < 8) return false + val actualKey = ShortKeyPair(identifier = IntSerializers.decode16beHu(raw, 4), seqNo = IntSerializers.decode16beHu(raw, 6)) + val echoHeader = IcmpEchoHeader(identifier_be = actualKey.identifier, seqNo_be = actualKey.seqNo) + if (actualKey != expected) return false + if (IntSerializers.decode8Bu(raw, 0) != echoHeader.type) return false + if (IntSerializers.decode8Bu(raw, 1) != echoHeader.code) return false + val data = raw.sliceArray(8..<(raw.size)) + if (!data.contentEquals(Pinger1.payloadLikeInPingProgramm)) return false + if (echoHeader.checksum(data) != IntSerializers.decode16beHu(raw, 2)) return false + return true + } + + override fun close() { + when (this._state.compareAndExchange(State.READY, State.CLOSED)) { + State.CLOSED -> throw IllegalStateException("Pinger is closed") + State.CONNECTED -> throw IllegalStateException("Can't close pinger while it measures time") + State.READY -> {} + } + this._rawSocket.close() + } + + companion object { + @Suppress("SpellCheckingInspection") + private val payloadLikeInPingProgramm = "abcdefghijklmnopqrstuvwabcdefghi".encodeToByteArray().asUByteArray() + } +} \ No newline at end of file diff --git a/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/entry_points.kt b/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/entry_points.kt index df5ad64..575b716 100644 --- a/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/entry_points.kt +++ b/modules/icmp/src/commonMain/kotlin/ru/landgrafhomyak/bgtu/networks0/icmp/entry_points.kt @@ -2,5 +2,5 @@ package ru.landgrafhomyak.bgtu.networks0.icmp import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketEventLoopScope -fun SocketEventLoopScope.pingSocket_IPv4(addr: String): PingSocket = PingSocket(this.icmp_IPv4(addr)) -fun SocketEventLoopScope.pingSocket_IPv6(addr: String): PingSocket = PingSocket(this.icmp_IPv6(addr)) \ No newline at end of file +fun SocketEventLoopScope.pinger_IPv4(addr: String): Pinger1 = Pinger1(this.icmp_IPv4(addr)) +fun SocketEventLoopScope.pinger_IPv6(addr: String): Pinger1 = Pinger1(this.icmp_IPv6(addr)) \ No newline at end of file diff --git a/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt b/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt index fde3b74..bbbb5fb 100644 --- a/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt +++ b/modules/low-level/sockets/src/linuxMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/EpollSocketEventLoop.kt @@ -2,7 +2,6 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.sockets import kotlin.coroutines.Continuation import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine import kotlinx.cinterop.CValue import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.StableRef @@ -13,8 +12,6 @@ import kotlinx.cinterop.get import kotlinx.cinterop.memScoped import kotlinx.cinterop.ptr import kotlinx.coroutines.suspendCancellableCoroutine -import platform.linux.EPOLLERR -import platform.linux.EPOLLHUP import platform.linux.EPOLLIN import platform.linux.EPOLLONESHOT import platform.linux.EPOLLOUT @@ -25,17 +22,17 @@ import platform.linux.epoll_create import platform.linux.epoll_ctl import platform.linux.epoll_event import platform.linux.epoll_wait +import platform.posix.close as close_lowlevel import platform.linux.inet_pton import platform.posix.AF_INET import platform.posix.sockaddr_in import platform.posix.sockaddr_in6 import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities -import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Mutex as TMutex import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1 import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2 -import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2e +import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Mutex as TMutex @OptIn(ExperimentalForeignApi::class) @Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName") @@ -76,13 +73,21 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable { this.data.fd = metadata.fd this.data.ptr = metadata.stableRef.asCPointer() } - if (!metadata.isAddedToEpoll) { - if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_ADD, metadata.fd, ee)) - PosixUtilities.throwErrno { d -> RuntimeException("Failed to add fd to epoll: $d") } - metadata.isAddedToEpoll = true + if (metadata.read != null || metadata.write != null) { + if (!metadata.isAddedToEpoll) { + if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_ADD, metadata.fd, ee)) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to add fd to epoll: $d") } + metadata.isAddedToEpoll = true + } else { + if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_MOD, metadata.fd, ee)) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to modify fd in epoll: $d") } + } } else { - if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_MOD, metadata.fd, ee)) - PosixUtilities.throwErrno { d -> RuntimeException("Failed to modify fd in epoll: $d") } + if (metadata.isAddedToEpoll) { + if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_DEL, metadata.fd, null)) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to remove fd from epoll: $d") } + metadata.isAddedToEpoll = false + } } } @@ -95,22 +100,38 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable { } } - private suspend fun _waitForRead(metadata: _SocketMetadata) = suspendCoroutine { continuation -> - metadata.sync.withLock { - if (metadata.read != null) - throw IllegalStateException("Socket already waiting for read") - metadata.read = continuation - this._schedule(metadata) + private suspend fun _waitForRead(metadata: _SocketMetadata) { + suspendCancellableCoroutine { continuation -> + metadata.sync.withLock { + if (metadata.read != null) + throw IllegalStateException("Socket already waiting for read") + metadata.read = continuation + this._schedule(metadata) + continuation.invokeOnCancellation { + metadata.sync.withLock { + metadata.read = null + this._schedule(metadata) + } + } + } } } - private suspend fun _waitForWrite(metadata: _SocketMetadata) = suspendCoroutine { continuation -> - metadata.sync.withLock { - if (metadata.write != null) - throw IllegalStateException("Socket already waiting for write") - metadata.write = continuation - this._schedule(metadata) + private suspend fun _waitForWrite(metadata: _SocketMetadata) { + suspendCancellableCoroutine { continuation -> + metadata.sync.withLock { + if (metadata.write != null) + throw IllegalStateException("Socket already waiting for write") + metadata.write = continuation + this._schedule(metadata) + continuation.invokeOnCancellation { + metadata.sync.withLock { + metadata.write = null + this._schedule(metadata) + } + } + } } } @@ -147,7 +168,9 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable { override fun close() { - TODO("Not yet implemented") + this._socketsCount.close("There are still opened sockets bound to this event-loop") + if (0 != close_lowlevel(this._epollDescriptor)) + PosixUtilities.throwErrno { d -> RuntimeException("Failed to close epoll descriptor: $d") } } class _SocketMetadata : AutoCloseable { diff --git a/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt b/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt index 0d65dc0..ffac8e8 100644 --- a/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt +++ b/modules/low-level/sockets/src/posixMain/kotlin/ru/landgrafhomyak/bgtu/networks0/low_level/sockets/NonblockingIcmpSocketImpl.kt @@ -44,10 +44,23 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { private val __rSync: CMutex private val __wSync: CMutex private val __refcnt: CloseableRefCounter + private val __parentProtocol: ParentProtocol + + private enum class ParentProtocol { + IPv4 { + override fun truncateHeader(orig: UByteArray): UByteArray = orig.sliceArray(((orig[0] and 0xFu).toUInt().toInt() + 15)..<orig.size) + }, + IPv6 { + override fun truncateHeader(orig: UByteArray): UByteArray = orig.sliceArray(40..orig.size) + }; + + abstract fun truncateHeader(orig: UByteArray): UByteArray + } protected constructor(ipv4: CValue<sockaddr_in>) { this.__rSync = CMutex() this.__wSync = CMutex() + this.__parentProtocol = ParentProtocol.IPv4 this.__refcnt = CloseableRefCounter("ICMP socket was closed") this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI<sockaddr_in>(), "IPv4") } @@ -55,6 +68,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { protected constructor(ipv6: CValue<sockaddr_in6>) { this.__rSync = CMutex() this.__wSync = CMutex() + this.__parentProtocol = ParentProtocol.IPv6 this.__refcnt = CloseableRefCounter("ICMP socket was closed") this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI<sockaddr_in6>(), "IPv6") } @@ -140,7 +154,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") } } } - return data.sliceArray(((data[0] and 0xFu).toUInt().toInt() + 15)..<data.size) + return this.__parentProtocol.truncateHeader(data) } @Suppress("KotlinUnreachableCode") throw RuntimeException("unreachable") diff --git a/programs/test/src/linuxX64Main/kotlin/main.kt b/programs/test/src/linuxX64Main/kotlin/main.kt index c7dfbea..2caa46e 100644 --- a/programs/test/src/linuxX64Main/kotlin/main.kt +++ b/programs/test/src/linuxX64Main/kotlin/main.kt @@ -1,7 +1,6 @@ import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import ru.landgrafhomyak.bgtu.networks0.icmp.pingSocket_IPv4 +import ru.landgrafhomyak.bgtu.networks0.icmp.pinger_IPv4 import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Thread import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.EpollSocketEventLoop @@ -25,7 +24,7 @@ fun main() { val loopThread = Thread(EventLoopRoutine(loop)) loopThread.start() - val googleSock = loop.pingSocket_IPv4("8.8.8.8") + val googleSock = loop.pinger_IPv4("8.8.8.8") runBlocking { while (true) {