Epoll event loop on linux

This commit is contained in:
Andrew Golovashevich 2025-03-17 22:01:46 +03:00
parent 5d17c5f7cf
commit 27e893632d
15 changed files with 598 additions and 12 deletions

View File

@ -10,12 +10,71 @@ repositories {
kotlin {
mingwX64()
linuxX64()
linuxArm64()
macosX64()
macosArm64()
sourceSets {
commonMain {
val commonMain by getting {
dependencies {
}
}
val commonTest by getting {
dependencies {
}
}
val nativeMain by creating {
dependsOn(commonMain)
}
val nativeTest by creating {
dependsOn(commonTest)
}
val posixMain by creating {
dependsOn(nativeMain)
}
val posixTest by creating {
dependsOn(nativeTest)
}
val linuxMain by creating {
dependsOn(posixMain)
}
val linuxTest by creating {
dependsOn(posixTest)
}
val linuxX64Main by getting {
dependsOn(linuxMain)
}
val linuxX64Test by getting {
dependsOn(linuxTest)
}
val linuxArm64Main by getting {
dependsOn(linuxMain)
}
val linuxArm64Test by getting {
dependsOn(linuxTest)
}
val macosMain by creating {
dependsOn(posixMain)
}
val macosTest by creating {
dependsOn(posixTest)
}
val macosX64Main by getting {
dependsOn(macosMain)
}
val macosX64Test by getting {
dependsOn(macosTest)
}
val macosArm64Main by getting {
dependsOn(macosMain)
}
val macosArm64Test by getting {
dependsOn(macosTest)
}
}
}

View File

@ -5,12 +5,29 @@ import kotlinx.cinterop.CPointer
import kotlinx.cinterop.CVariable
import kotlinx.cinterop.ExperimentalForeignApi
import platform.posix.malloc
import kotlinx.cinterop.sizeOf as sizeOf_signed
import kotlinx.cinterop.sizeOf
@OptIn(ExperimentalForeignApi::class)
object CUtilities {
inline fun <reified T : CVariable> sizeOf(): ULong = sizeOf_signed<T>().toULong()
/**
* Returns size of a C type in bytes, but as [unsigned long long][ULong].
* @see sizeOf
* @see sizeOfUI
*/
inline fun <reified T : CVariable> sizeOfUL(): ULong = sizeOf<T>().toULong()
/**
* Returns size of a C type in bytes, but as [unsigned long][UInt].
* @see sizeOf
* @see sizeOfUL
*/
inline fun <reified T : CVariable> sizeOfUI(): UInt = sizeOf<T>().toUInt()
/**
* Returns size of a C type in bytes, but as [unsigned long][UInt].
* @see sizeOf
* @see sizeOfUL
*/
@Suppress("UNCHECKED_CAST")
inline fun <reified T : CVariable> newOrOom() = (malloc(sizeOf<T>()) ?: throw OutOfMemoryError()) as CPointer<T>
inline fun <reified T : CVariable> newOrOom() = (malloc(sizeOfUL<T>()) ?: throw OutOfMemoryError()) as CPointer<T>
}

View File

@ -9,7 +9,7 @@ import platform.posix.errno
import platform.posix.strerror
object LinuxUtilities {
object PosixUtilities {
@OptIn(ExperimentalForeignApi::class, ExperimentalContracts::class)
inline fun throwErrno(
createError: (errnoDescription: String) -> Throwable,
@ -31,7 +31,7 @@ object LinuxUtilities {
}
val raw = strerror(origErrno)
if (raw != null) {
throw createError(raw.toKStringFromUtf8())
throw createError(raw.toKStringFromUtf8()) // todo handle error
} else {
val strerrErrno = errno
try {

View File

@ -10,13 +10,75 @@ repositories {
kotlin {
mingwX64()
linuxX64()
linuxArm64()
macosX64()
macosArm64()
sourceSets {
nativeMain {
val commonMain by getting {
dependencies {
}
}
val commonTest by getting {
dependencies {
}
}
val nativeMain by creating {
dependsOn(commonMain)
dependencies {
implementation(project(":modules:low-level:c-interop-utilities"))
}
}
val nativeTest by creating {
dependsOn(commonTest)
}
val posixMain by creating {
dependsOn(nativeMain)
}
val posixTest by creating {
dependsOn(nativeTest)
}
val linuxMain by creating {
dependsOn(posixMain)
}
val linuxTest by creating {
dependsOn(posixTest)
}
val linuxX64Main by getting {
dependsOn(linuxMain)
}
val linuxX64Test by getting {
dependsOn(linuxTest)
}
val linuxArm64Main by getting {
dependsOn(linuxMain)
}
val linuxArm64Test by getting {
dependsOn(linuxTest)
}
val macosMain by creating {
dependsOn(posixMain)
}
val macosTest by creating {
dependsOn(posixTest)
}
val macosX64Main by getting {
dependsOn(macosMain)
}
val macosX64Test by getting {
dependsOn(macosTest)
}
val macosArm64Main by getting {
dependsOn(macosMain)
}
val macosArm64Test by getting {
dependsOn(macosTest)
}
}
}

View File

@ -0,0 +1,29 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
@OptIn(ExperimentalContracts::class)
inline fun <R> TMutex.withLock(synchronizedBlock: () -> R): R {
contract {
callsInPlace(synchronizedBlock, InvocationKind.EXACTLY_ONCE)
}
this.lock()
var err1: Throwable? = null
try {
return synchronizedBlock()
} catch (t: Throwable) {
err1 = t
throw t
} finally {
try {
this.unlock()
} catch (err2: Throwable) {
if (err1 != null)
err1.addSuppressed(err2)
else
throw err2
}
}
}

View File

@ -9,7 +9,7 @@ import platform.posix.pthread_mutex_lock
import platform.posix.pthread_mutex_t
import platform.posix.pthread_mutex_unlock
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.CUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.LinuxUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
@Suppress("JoinDeclarationAndAssignment", "CanBeVal", "ConvertSecondaryConstructorToPrimary")
@OptIn(ExperimentalForeignApi::class)
@ -21,7 +21,7 @@ actual class TMutex : AutoCloseable {
this.descriptor = CUtilities.newOrOom<pthread_mutex_t>()
var err = pthread_mutex_init(this.descriptor, null)
if (err != 0)
LinuxUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads mutex: $d") }
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads mutex: $d") }
this.isClosed = false
}
@ -29,14 +29,14 @@ actual class TMutex : AutoCloseable {
if (this.isClosed) throw IllegalStateException("Mutex was destroyed")
var err = pthread_mutex_lock(this.descriptor)
if (err != 0)
LinuxUtilities.throwErrno(err) { d -> RuntimeException("Failed to lock pthreads mutex: $d") }
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to lock pthreads mutex: $d") }
}
actual fun unlock() {
if (this.isClosed) throw IllegalStateException("Mutex was destroyed")
var err = pthread_mutex_unlock(this.descriptor)
if (err != 0)
LinuxUtilities.throwErrno(err) { d -> RuntimeException("Failed to unlock pthreads mutex: $d") }
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to unlock pthreads mutex: $d") }
}
override fun close() {
@ -44,7 +44,7 @@ actual class TMutex : AutoCloseable {
this.isClosed = true
var err = pthread_mutex_destroy(this.descriptor)
if (err != 0)
LinuxUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads mutex: $d") }
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads mutex: $d") }
free(this.descriptor)
}
}

View File

@ -0,0 +1,85 @@
plugins {
kotlin("multiplatform") version "2.1.0"
}
repositories {
mavenCentral()
}
kotlin {
mingwX64()
linuxX64()
linuxArm64()
macosX64()
macosArm64()
sourceSets {
val commonMain by getting {
dependencies {
implementation("org.jetbrains.kotlinx:atomicfu:0.27.0")
}
}
val commonTest by getting {
dependencies {
}
}
val nativeMain by creating {
dependsOn(commonMain)
dependencies {
implementation(project(":modules:low-level:c-interop-utilities"))
implementation(project(":modules:low-level:multithreading"))
}
}
val nativeTest by creating {
dependsOn(commonTest)
}
val posixMain by creating {
dependsOn(nativeMain)
}
val posixTest by creating {
dependsOn(nativeTest)
}
val linuxMain by creating {
dependsOn(posixMain)
}
val linuxTest by creating {
dependsOn(posixTest)
}
val linuxX64Main by getting {
dependsOn(linuxMain)
}
val linuxX64Test by getting {
dependsOn(linuxTest)
}
val linuxArm64Main by getting {
dependsOn(linuxMain)
}
val linuxArm64Test by getting {
dependsOn(linuxTest)
}
val macosMain by creating {
dependsOn(posixMain)
}
val macosTest by creating {
dependsOn(posixTest)
}
val macosX64Main by getting {
dependsOn(macosMain)
}
val macosX64Test by getting {
dependsOn(macosTest)
}
val macosArm64Main by getting {
dependsOn(macosMain)
}
val macosArm64Test by getting {
dependsOn(macosTest)
}
}
}

View File

@ -0,0 +1,6 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
interface IcmpSocket : AutoCloseable {
suspend fun send(data: UByteArray)
suspend fun recv(): UByteArray
}

View File

@ -0,0 +1,9 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
expect class SocketBlockingEventLoop : ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketEventLoop {
public constructor()
companion object {
fun runForever(el: ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketBlockingEventLoop)
}
}

View File

@ -0,0 +1,7 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
@Suppress("FunctionName")
interface SocketEventLoop {
fun icmp_IPv4(addr: String): ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket
fun icmp_IPv6(addr: String): ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket
}

View File

@ -0,0 +1,154 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
import kotlin.concurrent.AtomicReference
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
import kotlinx.atomicfu.atomic
import kotlinx.cinterop.CValue
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.allocArray
import kotlinx.cinterop.cValue
import kotlinx.cinterop.get
import kotlinx.cinterop.memScoped
import kotlinx.cinterop.ptr
import platform.linux.EPOLLIN
import platform.linux.EPOLLOUT
import platform.linux.EPOLL_CTL_ADD
import platform.linux.epoll_create
import platform.linux.epoll_ctl
import platform.linux.epoll_event
import platform.linux.epoll_wait
import platform.linux.inet_pton
import platform.posix.AF_INET
import platform.posix.sockaddr_in
import platform.posix.sockaddr_in6
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.TMutex
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock
@OptIn(ExperimentalForeignApi::class)
@Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName")
actual class SocketBlockingEventLoop : SocketEventLoop {
private val _tsync = TMutex()
private val _id2cont = HashMap<ULong, Continuation<Unit>>()
private var _nextSockId: ULong = 0uL
private val _epollDescriptor: Int
actual constructor() {
this._epollDescriptor = epoll_create(0)
if (this._epollDescriptor < 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to create epoll descriptor: $d") }
}
private val _state = AtomicReference(State.READY)
private val _openedSockets = atomic(0uL)
private enum class State {
READY, RUNNING, CLOSED
}
private fun _assertNotClosed() {
if (this._state.value == State.CLOSED)
throw IllegalStateException("Eventloop is closed")
}
private fun _parseIpv4(rawAddr: String, port: UShort): CValue<sockaddr_in> = cValue<sockaddr_in> out@{
memScoped<Unit> {
val err = inet_pton(AF_INET, rawAddr, this@out.sin_addr.ptr)
when {
err == 0 -> throw IllegalArgumentException("IPv4 address is ill-formatted")
err < 0 -> PosixUtilities.throwErrno { d -> throw RuntimeException("Failed to parse IPv4 address: $d") }
}
}
this@out.sin_family = AF_INET.toUShort()
this@out.sin_port = port
}
override fun icmp_IPv4(addr: String): IcmpSocket {
this._assertNotClosed()
return BoundIcmpSocket(this._parseIpv4(addr, 0u))
}
override fun icmp_IPv6(addr: String): IcmpSocket {
this._assertNotClosed()
TODO("Not yet implemented")
}
private suspend fun _waitFor(fd: Int, action: Int) = suspendCoroutine { continuation ->
val contId: ULong
this._tsync.withLock {
contId = this._nextSockId++
this._id2cont[contId] = continuation
}
val ee = cValue<epoll_event> {
this.events = action.toUInt()
this.data.fd = fd
this.data.u64 = contId
}
if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_ADD, fd, ee))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to add event to epoll: $d") }
}
private suspend fun _waitForRead(fd: Int) =
this._waitFor(fd, EPOLLIN)
private suspend fun _waitForWrite(fd: Int) =
this._waitFor(fd, EPOLLOUT)
private fun _resume(id: ULong) {
val cont = this._tsync.withLock { this._id2cont.remove(id) }
if (cont == null) {
// todo
return
}
cont.resume(Unit)
}
private fun _mainloop() {
when (this._state.compareAndExchange(State.READY, State.RUNNING)) {
State.RUNNING -> throw IllegalStateException("Eventloop already running")
State.CLOSED -> throw IllegalStateException("Eventloop is closed")
State.READY -> {}
}
val eventsBufferSize = 100
memScoped {
val readyEvents = allocArray<epoll_event>(eventsBufferSize)
while (true) {
val readyEventsCount = epoll_wait(this@SocketBlockingEventLoop._epollDescriptor, readyEvents, eventsBufferSize, -1)
if (readyEventsCount < 0)
PosixUtilities.throwErrno { d -> RuntimeException("Error while waiting epoll: $d") }
for (i in 0..<readyEventsCount) {
this@SocketBlockingEventLoop._resume(readyEvents[0].data.u64)
}
}
}
}
actual companion object {
actual fun runForever(el: SocketBlockingEventLoop) {
el._mainloop()
}
}
private inner class BoundIcmpSocket : NonblockingIcmpSocketImpl {
constructor(ipv4: CValue<sockaddr_in>) : super(ipv4)
constructor(ipv6: CValue<sockaddr_in6>) : super(ipv6)
override suspend fun _waitForWrite() =
this@SocketBlockingEventLoop._waitForWrite(this._socketFd)
override suspend fun _waitForRead() =
this@SocketBlockingEventLoop._waitForRead(this._socketFd)
}
}

View File

@ -0,0 +1,27 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
actual class SocketBlockingEventLoop : ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketEventLoop {
actual constructor() {
TODO("Not yet implemented")
}
override fun icmp_IPv4(addr: String): ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket {
TODO("Not yet implemented")
}
override fun icmp_IPv6(addr: String): ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket {
TODO("Not yet implemented")
}
private fun _mainloop() {
}
actual companion object {
actual fun runForever(el: ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketBlockingEventLoop) {
el._mainloop()
}
}
}

View File

@ -0,0 +1,6 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
interface IcmpSocket {
suspend fun send(data: UByteArray)
suspend fun recv(): UByteArray
}

View File

@ -0,0 +1,7 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
@Suppress("FunctionName")
interface SocketEventLoop {
fun icmp_IPv4(addr: String): IcmpSocket
fun icmp_IPv6(addr: String): IcmpSocket
}

View File

@ -0,0 +1,118 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
import kotlinx.cinterop.CValue
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.addressOf
import kotlinx.cinterop.memScoped
import kotlinx.cinterop.usePinned
import platform.posix.AF_INET
import platform.posix.AF_INET6
import platform.posix.EAGAIN
import platform.posix.EMSGSIZE
import platform.posix.EWOULDBLOCK
import platform.posix.F_SETFL
import platform.posix.IPPROTO_ICMP
import platform.posix.IPPROTO_ICMPV6
import platform.posix.MSG_NOSIGNAL
import platform.posix.MSG_PEEK
import platform.posix.MSG_TRUNC
import platform.posix.O_NONBLOCK
import platform.posix.SOCK_RAW
import platform.posix.connect
import platform.posix.errno
import platform.posix.fcntl
import platform.posix.sockaddr
import platform.posix.sockaddr_in
import platform.posix.sockaddr_in6
import platform.posix.socket
import platform.posix.ssize_t
import platform.posix.send as send_lowlevel
import platform.posix.recv as recv_lowlevel
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.CUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
@Suppress("CanBeVal", "DUPLICATE_LABEL_IN_WHEN")
@OptIn(ExperimentalForeignApi::class)
internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
protected val _socketFd: Int
protected constructor(ipv4: CValue<sockaddr_in>) {
this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI<sockaddr_in>())
}
protected constructor(ipv6: CValue<sockaddr_in6>) {
this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI<sockaddr_in6>())
}
@Suppress("FunctionName")
private fun __createSocket(addrFamily: Int, protocol: Int, addrValue: CValue<*>, addrSize: UInt): Int {
val sock = socket(addrFamily, SOCK_RAW, protocol)
if (sock < 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to create ICMP socket over IPv4: $d") }
var err = memScoped {
@Suppress("UNCHECKED_CAST")
return@memScoped connect(sock, addrValue as CValue<sockaddr>, addrSize)
}
if (err != 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to bind ICMP socket to provided IPv6 address: $d") }
err = fcntl(sock, F_SETFL, O_NONBLOCK)
if (err != 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to switch ICMP socket to non-blocking mode: $d") }
return sock
}
@Suppress("FunctionName")
protected abstract suspend fun _waitForWrite()
@Suppress("FunctionName")
protected abstract suspend fun _waitForRead()
override suspend fun send(data: UByteArray) {
polling@ while (true) {
this._waitForWrite()
val sentCount = data.usePinned { pinnedData ->
return@usePinned send_lowlevel(this._socketFd, pinnedData.addressOf(0), data.size.toULong(), MSG_NOSIGNAL)
}
if (sentCount < 0) {
when (errno) {
EAGAIN, EWOULDBLOCK -> continue@polling
EMSGSIZE -> throw IllegalArgumentException("Data size bigger than can be sent by one packet")
else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message ove ICMP socket: $d") }
}
}
}
}
override suspend fun recv(): UByteArray {
polling@ while (true) {
this._waitForRead()
val bytesAvailable: ssize_t = recv_lowlevel(
this._socketFd, null, 0u,
MSG_NOSIGNAL or MSG_PEEK or MSG_TRUNC
)
if (bytesAvailable == 0L) continue@polling
if (bytesAvailable < 0) {
when (errno) {
EAGAIN, EWOULDBLOCK -> continue@polling
else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message ove ICMP socket: $d") }
}
}
val data = UByteArray(bytesAvailable.toInt())
var sentCount = data.usePinned { pinnedData ->
return@usePinned recv_lowlevel(this._socketFd, pinnedData.addressOf(0), data.size.toULong(), MSG_NOSIGNAL)
}
if (sentCount < 0) {
when (errno) {
EAGAIN, EWOULDBLOCK -> continue@polling
else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message ove ICMP socket: $d") }
}
}
}
}
}