Fixes in epoll event-loop and pinger

This commit is contained in:
Andrew Golovashevich 2025-03-20 03:42:56 +03:00
parent eb19b3e36a
commit 834d12c17b
8 changed files with 206 additions and 142 deletions

View File

@ -17,9 +17,11 @@ kotlin {
sourceSets {
commonMain {
dependencies {
implementation(Dependencies.kotlin_atomicfu)
implementation(Dependencies.kotlin_coroutines_core)
implementation(Dependencies.kotlin_datetime)
implementation(Dependencies.int_serializers)
implementation(project(":modules:utilities"))
implementation(project(":modules:low-level:sockets"))
}
}

View File

@ -0,0 +1,31 @@
package ru.landgrafhomyak.bgtu.networks0.icmp
import ru.landgrafhomyak.utility.IntSerializers
@OptIn(ExperimentalUnsignedTypes::class)
class IcmpEchoHeader(
val identifier_be: UShort,
val seqNo_be: UShort,
) : IcmpHeader {
override val type: UByte get() = 0u
override val code: UByte get() = 0u
override fun checksum(withData: UByteArray): UShort = Ipv4Checksum.calculate { c ->
c.addFirstByte(this.type)
c.addSecondByte(this.code)
c.addWord(this.identifier_be)
c.addWord(this.seqNo_be)
c.addTrailingData(withData)
}
override fun serialize(withData: UByteArray): UByteArray {
val serialized = UByteArray(withData.size + 8)
IntSerializers.encode8Bu(serialized, 0, this.type)
IntSerializers.encode8Bu(serialized, 1, this.code)
IntSerializers.encode16beHu(serialized, 2, this.checksum(withData))
IntSerializers.encode16beHu(serialized, 4, this.identifier_be)
IntSerializers.encode16beHu(serialized, 6, this.seqNo_be)
withData.copyInto(serialized, 8)
return serialized
}
}

View File

@ -1,112 +0,0 @@
package ru.landgrafhomyak.bgtu.networks0.icmp
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withTimeout
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.coroutines.sync.Mutex as CMutex
import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2
import ru.landgrafhomyak.utility.IntSerializers
@OptIn(ExperimentalUnsignedTypes::class)
class PingSocket : AutoCloseable {
private val _rawSocket: IcmpSocket
private val _mapSync: CMutex
private val _resumeMap: HashMap<ShortKeyPair, Continuation<Unit>>
private val _recvCoro: Job
internal constructor(rawSocket: IcmpSocket) {
safeAutoClose2(onAbort = rawSocket::close) {
this._rawSocket = rawSocket
this._resumeMap = HashMap()
this._mapSync = CMutex()
this._recvCoro = CoroutineScope(Dispatchers.Default).launch { this@PingSocket._recvRoutine() }
}
}
private class ShortKeyPair(val identifier: UShort, val seqNo: UShort) {
override fun equals(other: Any?): Boolean {
if (other !is ShortKeyPair) return false
return this.identifier == other.identifier && this.seqNo == other.seqNo
}
override fun hashCode(): Int {
return ((this.identifier.toUInt() shl 16) or (this.seqNo).toUInt()).toInt()
}
override fun toString(): String = "<icmp ping key pair identifier=${this.identifier} seq_no=${this.seqNo}>"
}
private fun _generateKeyPair(): ShortKeyPair {
val timestamp = Clock.System.now()
return ShortKeyPair(
identifier = (timestamp.epochSeconds ushr 13).toULong().toUShort(),
seqNo = ((timestamp.epochSeconds shl 3) or ((((timestamp.nanosecondsOfSecond ushr 27) and 7).toLong())) and 0xFFFF).toULong().toUShort(),
)
}
suspend fun ping(timeoutMillis: UInt): UInt? {
var isLocked = true
val start: Instant
this._mapSync.lock()
safeAutoClose1(finally = { if (isLocked) this._mapSync.unlock() }) {
val key = this._generateKeyPair()
val header = IcmpPingHeader(identifier_be = key.identifier, seqNo_be = key.seqNo)
val serialized = header.serialize(PingSocket.randomData)
start = Clock.System.now()
this._rawSocket.sendRaw(serialized)
try {
withTimeout(timeoutMillis.toULong().toLong()) {
suspendCancellableCoroutine<Unit> { continuation ->
if (key in this@PingSocket._resumeMap)
throw RuntimeException("Key duplication (shouldn't happen)")
this@PingSocket._resumeMap[key] = continuation
this@PingSocket._mapSync.unlock()
isLocked = false
}
}
} catch (_: TimeoutCancellationException) {
return null
}
}
val td = Clock.System.now() - start
return td.inWholeMilliseconds.toUInt()
}
private suspend fun _recvRoutine() {
try {
// todo handle errors
while (true) {
val raw = this._rawSocket.recvRaw()
if (raw.size < 8) continue
val key = ShortKeyPair(identifier = IntSerializers.decode16beHu(raw, 4), seqNo = IntSerializers.decode16beHu(raw, 6))
val continuation = this._mapSync.withLock { this._resumeMap.remove(key) }
if (continuation == null) continue
continuation.resume(Unit)
}
} catch (e: Throwable) {
e.printStackTrace()
}
}
override fun close() {
TODO("Not yet implemented")
}
companion object {
@Suppress("SpellCheckingInspection")
private val randomData = "abcdefghijklmnopqrstuvwabcdefghi".encodeToByteArray().asUByteArray()
}
}

View File

@ -0,0 +1,107 @@
package ru.landgrafhomyak.bgtu.networks0.icmp
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket
import ru.landgrafhomyak.bgtu.networks0.utilities.compareAndExchange
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2
import ru.landgrafhomyak.utility.IntSerializers
@OptIn(ExperimentalUnsignedTypes::class)
class Pinger1 : AutoCloseable {
private enum class State {
READY, CONNECTED, CLOSED
}
private val _rawSocket: IcmpSocket
private var _state: AtomicRef<State>
internal constructor(rawSocket: IcmpSocket) {
safeAutoClose2(onAbort = rawSocket::close) {
this._state = atomic(State.READY)
this._rawSocket = rawSocket
}
}
private class ShortKeyPair(val identifier: UShort, val seqNo: UShort) {
override fun equals(other: Any?): Boolean {
if (other !is ShortKeyPair) return false
return this.identifier == other.identifier && this.seqNo == other.seqNo
}
override fun hashCode(): Int {
return ((this.identifier.toUInt() shl 16) or (this.seqNo).toUInt()).toInt()
}
override fun toString(): String = "<icmp ping key pair identifier=${this.identifier} seq_no=${this.seqNo}>"
}
private fun _generateKeyPair(): ShortKeyPair {
val timestamp = Clock.System.now()
return ShortKeyPair(
identifier = (timestamp.epochSeconds ushr 13).toULong().toUShort(),
seqNo = ((timestamp.epochSeconds shl 3) or ((((timestamp.nanosecondsOfSecond ushr 27) and 7).toLong())) and 0xFFFF).toULong().toUShort(),
)
}
suspend fun ping(timeoutMillis: UInt): UInt? {
when (this._state.compareAndExchange(State.READY, State.CONNECTED)) {
State.CLOSED -> throw IllegalStateException("Pinger is closed")
State.CONNECTED -> throw IllegalStateException("Pinger is busy with another thread/coroutine")
State.READY -> {}
}
safeAutoClose1(finally = { this._state.value = State.READY }) {
val start: Instant
val key = this._generateKeyPair()
val header = IcmpPingHeader(identifier_be = key.identifier, seqNo_be = key.seqNo)
val serialized = header.serialize(Pinger1.payloadLikeInPingProgramm)
start = Clock.System.now()
this._rawSocket.sendRaw(serialized)
try {
withTimeout(timeoutMillis.toULong().toLong()) {
while (true)
if (this@Pinger1._dispatchResponse(key, this@Pinger1._rawSocket.recvRaw())) return@withTimeout
}
} catch (_: TimeoutCancellationException) {
return null
}
val td = Clock.System.now() - start
return td.inWholeMilliseconds.toUInt()
}
}
private fun _dispatchResponse(expected: ShortKeyPair, raw: UByteArray): Boolean {
if (raw.size < 8) return false
val actualKey = ShortKeyPair(identifier = IntSerializers.decode16beHu(raw, 4), seqNo = IntSerializers.decode16beHu(raw, 6))
val echoHeader = IcmpEchoHeader(identifier_be = actualKey.identifier, seqNo_be = actualKey.seqNo)
if (actualKey != expected) return false
if (IntSerializers.decode8Bu(raw, 0) != echoHeader.type) return false
if (IntSerializers.decode8Bu(raw, 1) != echoHeader.code) return false
val data = raw.sliceArray(8..<(raw.size))
if (!data.contentEquals(Pinger1.payloadLikeInPingProgramm)) return false
if (echoHeader.checksum(data) != IntSerializers.decode16beHu(raw, 2)) return false
return true
}
override fun close() {
when (this._state.compareAndExchange(State.READY, State.CLOSED)) {
State.CLOSED -> throw IllegalStateException("Pinger is closed")
State.CONNECTED -> throw IllegalStateException("Can't close pinger while it measures time")
State.READY -> {}
}
this._rawSocket.close()
}
companion object {
@Suppress("SpellCheckingInspection")
private val payloadLikeInPingProgramm = "abcdefghijklmnopqrstuvwabcdefghi".encodeToByteArray().asUByteArray()
}
}

View File

@ -2,5 +2,5 @@ package ru.landgrafhomyak.bgtu.networks0.icmp
import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketEventLoopScope
fun SocketEventLoopScope.pingSocket_IPv4(addr: String): PingSocket = PingSocket(this.icmp_IPv4(addr))
fun SocketEventLoopScope.pingSocket_IPv6(addr: String): PingSocket = PingSocket(this.icmp_IPv6(addr))
fun SocketEventLoopScope.pinger_IPv4(addr: String): Pinger1 = Pinger1(this.icmp_IPv4(addr))
fun SocketEventLoopScope.pinger_IPv6(addr: String): Pinger1 = Pinger1(this.icmp_IPv6(addr))

View File

@ -2,7 +2,6 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
import kotlinx.cinterop.CValue
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.StableRef
@ -13,8 +12,6 @@ import kotlinx.cinterop.get
import kotlinx.cinterop.memScoped
import kotlinx.cinterop.ptr
import kotlinx.coroutines.suspendCancellableCoroutine
import platform.linux.EPOLLERR
import platform.linux.EPOLLHUP
import platform.linux.EPOLLIN
import platform.linux.EPOLLONESHOT
import platform.linux.EPOLLOUT
@ -25,17 +22,17 @@ import platform.linux.epoll_create
import platform.linux.epoll_ctl
import platform.linux.epoll_event
import platform.linux.epoll_wait
import platform.posix.close as close_lowlevel
import platform.linux.inet_pton
import platform.posix.AF_INET
import platform.posix.sockaddr_in
import platform.posix.sockaddr_in6
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Mutex as TMutex
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2e
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Mutex as TMutex
@OptIn(ExperimentalForeignApi::class)
@Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName")
@ -76,13 +73,21 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
this.data.fd = metadata.fd
this.data.ptr = metadata.stableRef.asCPointer()
}
if (!metadata.isAddedToEpoll) {
if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_ADD, metadata.fd, ee))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to add fd to epoll: $d") }
metadata.isAddedToEpoll = true
if (metadata.read != null || metadata.write != null) {
if (!metadata.isAddedToEpoll) {
if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_ADD, metadata.fd, ee))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to add fd to epoll: $d") }
metadata.isAddedToEpoll = true
} else {
if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_MOD, metadata.fd, ee))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to modify fd in epoll: $d") }
}
} else {
if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_MOD, metadata.fd, ee))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to modify fd in epoll: $d") }
if (metadata.isAddedToEpoll) {
if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_DEL, metadata.fd, null))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to remove fd from epoll: $d") }
metadata.isAddedToEpoll = false
}
}
}
@ -95,22 +100,38 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
}
}
private suspend fun _waitForRead(metadata: _SocketMetadata) = suspendCoroutine { continuation ->
metadata.sync.withLock {
if (metadata.read != null)
throw IllegalStateException("Socket already waiting for read")
metadata.read = continuation
this._schedule(metadata)
private suspend fun _waitForRead(metadata: _SocketMetadata) {
suspendCancellableCoroutine { continuation ->
metadata.sync.withLock {
if (metadata.read != null)
throw IllegalStateException("Socket already waiting for read")
metadata.read = continuation
this._schedule(metadata)
continuation.invokeOnCancellation {
metadata.sync.withLock {
metadata.read = null
this._schedule(metadata)
}
}
}
}
}
private suspend fun _waitForWrite(metadata: _SocketMetadata) = suspendCoroutine { continuation ->
metadata.sync.withLock {
if (metadata.write != null)
throw IllegalStateException("Socket already waiting for write")
metadata.write = continuation
this._schedule(metadata)
private suspend fun _waitForWrite(metadata: _SocketMetadata) {
suspendCancellableCoroutine { continuation ->
metadata.sync.withLock {
if (metadata.write != null)
throw IllegalStateException("Socket already waiting for write")
metadata.write = continuation
this._schedule(metadata)
continuation.invokeOnCancellation {
metadata.sync.withLock {
metadata.write = null
this._schedule(metadata)
}
}
}
}
}
@ -147,7 +168,9 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
override fun close() {
TODO("Not yet implemented")
this._socketsCount.close("There are still opened sockets bound to this event-loop")
if (0 != close_lowlevel(this._epollDescriptor))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to close epoll descriptor: $d") }
}
class _SocketMetadata : AutoCloseable {

View File

@ -44,10 +44,23 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
private val __rSync: CMutex
private val __wSync: CMutex
private val __refcnt: CloseableRefCounter
private val __parentProtocol: ParentProtocol
private enum class ParentProtocol {
IPv4 {
override fun truncateHeader(orig: UByteArray): UByteArray = orig.sliceArray(((orig[0] and 0xFu).toUInt().toInt() + 15)..<orig.size)
},
IPv6 {
override fun truncateHeader(orig: UByteArray): UByteArray = orig.sliceArray(40..orig.size)
};
abstract fun truncateHeader(orig: UByteArray): UByteArray
}
protected constructor(ipv4: CValue<sockaddr_in>) {
this.__rSync = CMutex()
this.__wSync = CMutex()
this.__parentProtocol = ParentProtocol.IPv4
this.__refcnt = CloseableRefCounter("ICMP socket was closed")
this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI<sockaddr_in>(), "IPv4")
}
@ -55,6 +68,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
protected constructor(ipv6: CValue<sockaddr_in6>) {
this.__rSync = CMutex()
this.__wSync = CMutex()
this.__parentProtocol = ParentProtocol.IPv6
this.__refcnt = CloseableRefCounter("ICMP socket was closed")
this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI<sockaddr_in6>(), "IPv6")
}
@ -140,7 +154,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") }
}
}
return data.sliceArray(((data[0] and 0xFu).toUInt().toInt() + 15)..<data.size)
return this.__parentProtocol.truncateHeader(data)
}
@Suppress("KotlinUnreachableCode")
throw RuntimeException("unreachable")

View File

@ -1,7 +1,6 @@
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import ru.landgrafhomyak.bgtu.networks0.icmp.pingSocket_IPv4
import ru.landgrafhomyak.bgtu.networks0.icmp.pinger_IPv4
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Thread
import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.EpollSocketEventLoop
@ -25,7 +24,7 @@ fun main() {
val loopThread = Thread(EventLoopRoutine(loop))
loopThread.start()
val googleSock = loop.pingSocket_IPv4("8.8.8.8")
val googleSock = loop.pinger_IPv4("8.8.8.8")
runBlocking {
while (true) {