Minimal working parallel ping implementation

This commit is contained in:
Andrew Golovashevich 2025-03-19 19:52:36 +03:00
parent 18886705af
commit 7e2c6d0e43
22 changed files with 481 additions and 252 deletions

View File

@ -25,6 +25,7 @@ kotlin {
commonMain { commonMain {
dependencies { dependencies {
implementation(Dependencies.kotlin_atomicfu) implementation(Dependencies.kotlin_atomicfu)
implementation(project(":modules:utilities"))
} }
} }

View File

@ -1,13 +1,17 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2
class CountDownLatch : AutoCloseable { class CountDownLatch : AutoCloseable {
private val _refcnt: _Refcnt private val _refcnt: CloseableRefCounter
private var _counter: ULong private var _counter: ULong
private val _mutex: Mutex private val _mutex: Mutex
private val _condition: Condition private val _condition: Condition
constructor(initialCounterValue: ULong) { constructor(initialCounterValue: ULong) {
this._refcnt = _Refcnt("Latch was destroyed") this._refcnt = CloseableRefCounter("Latch was destroyed")
this._counter = initialCounterValue this._counter = initialCounterValue
this._mutex = Mutex() this._mutex = Mutex()
_safeAutoClose2(onAbort = this._mutex::close) { _safeAutoClose2(onAbort = this._mutex::close) {

View File

@ -3,6 +3,7 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import kotlin.contracts.ExperimentalContracts import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind import kotlin.contracts.InvocationKind
import kotlin.contracts.contract import kotlin.contracts.contract
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
@OptIn(ExperimentalContracts::class) @OptIn(ExperimentalContracts::class)
inline fun <R> Mutex.withLock(synchronizedBlock: () -> R): R { inline fun <R> Mutex.withLock(synchronizedBlock: () -> R): R {

View File

@ -13,15 +13,18 @@ import platform.posix.pthread_cond_wait
import platform.posix.pthread_cond_broadcast import platform.posix.pthread_cond_broadcast
import platform.posix.pthread_cond_signal import platform.posix.pthread_cond_signal
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2
@OptIn(ExperimentalForeignApi::class) @OptIn(ExperimentalForeignApi::class)
actual class Condition : AutoCloseable { actual class Condition : AutoCloseable {
private val _refcnt: _Refcnt private val _refcnt: CloseableRefCounter
val _descriptor: CPointer<pthread_cond_t> val _descriptor: CPointer<pthread_cond_t>
actual constructor() { actual constructor() {
this._refcnt = _Refcnt("Pthreads condition was destroyed") this._refcnt = CloseableRefCounter("Pthreads condition was destroyed")
this._descriptor = nativeHeap.alloc<pthread_cond_t>().ptr this._descriptor = nativeHeap.alloc<pthread_cond_t>().ptr
_safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) { _safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) {
var err = pthread_cond_init(this._descriptor, null) var err = pthread_cond_init(this._descriptor, null)

View File

@ -12,6 +12,9 @@ import platform.posix.pthread_mutex_lock
import platform.posix.pthread_mutex_t import platform.posix.pthread_mutex_t
import platform.posix.pthread_mutex_unlock import platform.posix.pthread_mutex_unlock
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2
@OptIn(ExperimentalForeignApi::class, Mutex.RefcntAccess::class) @OptIn(ExperimentalForeignApi::class, Mutex.RefcntAccess::class)
actual class Mutex : AutoCloseable { actual class Mutex : AutoCloseable {
@ -19,11 +22,11 @@ actual class Mutex : AutoCloseable {
internal annotation class RefcntAccess internal annotation class RefcntAccess
@RefcntAccess @RefcntAccess
internal val _refcnt: _Refcnt internal val _refcnt: CloseableRefCounter
internal val _descriptor: CPointer<pthread_mutex_t> internal val _descriptor: CPointer<pthread_mutex_t>
actual constructor() { actual constructor() {
this._refcnt = _Refcnt("Pthreads mutex was destroyed") this._refcnt = CloseableRefCounter("Pthreads mutex was destroyed")
this._descriptor = nativeHeap.alloc<pthread_mutex_t>().ptr this._descriptor = nativeHeap.alloc<pthread_mutex_t>().ptr
_safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) { _safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) {
var err = pthread_mutex_init(this._descriptor, null) var err = pthread_mutex_init(this._descriptor, null)

View File

@ -9,6 +9,10 @@ import kotlinx.cinterop.StableRef
import kotlinx.cinterop.asStableRef import kotlinx.cinterop.asStableRef
import kotlinx.cinterop.staticCFunction import kotlinx.cinterop.staticCFunction
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2e
import ru.landgrafhomyak.bgtu.networks0.utilities.compareAndExchange
@OptIn(ExperimentalForeignApi::class) @OptIn(ExperimentalForeignApi::class)
actual class Thread : AutoCloseable { actual class Thread : AutoCloseable {

View File

@ -1,4 +1,5 @@
import ru.landgrafhomyak.bgtu.networks0.build_script.Dependencies import ru.landgrafhomyak.bgtu.networks0.build_script.Dependencies
import ru.landgrafhomyak.bgtu.networks0.build_script.configureWarnings
import ru.landgrafhomyak.bgtu.networks0.build_script.setupHierarchy import ru.landgrafhomyak.bgtu.networks0.build_script.setupHierarchy
plugins { plugins {
@ -11,6 +12,8 @@ repositories {
kotlin { kotlin {
configureWarnings()
mingwX64() mingwX64()
linuxX64() linuxX64()
linuxArm64() linuxArm64()
@ -22,13 +25,14 @@ kotlin {
commonMain { commonMain {
dependencies { dependencies {
implementation(Dependencies.kotlin_atomicfu) implementation(Dependencies.kotlin_atomicfu)
implementation(Dependencies.kotlin_coroutines_core)
implementation(project(":modules:low-level:multithreading"))
} }
} }
nativeMain { nativeMain {
dependencies { dependencies {
implementation(project(":modules:low-level:c-interop-utilities")) implementation(project(":modules:low-level:c-interop-utilities"))
implementation(project(":modules:low-level:multithreading"))
} }
} }
} }

View File

@ -1,6 +1,23 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
/**
* High-level interface for interacting with icmp socket.
* Destination host address is bound to instance and can't be changed.
*/
interface IcmpSocket : AutoCloseable { interface IcmpSocket : AutoCloseable {
suspend fun send(data: UByteArray) /**
suspend fun recv(): UByteArray * Sends [data] as is in single datagram to destination host.
*/
suspend fun sendRaw(data: UByteArray)
/**
* Waits for datagram from destination host and returns it as is.
*/
suspend fun recvRaw(): UByteArray
/**
* Closes socket and releases related resources.
* @throws IllegalStateException if called on already closed socket.
*/
override fun close()
} }

View File

@ -1,9 +0,0 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
expect class SocketBlockingEventLoop : ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketEventLoop {
public constructor()
companion object {
fun runForever(el: ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketBlockingEventLoop)
}
}

View File

@ -1,7 +0,0 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
@Suppress("FunctionName")
interface SocketEventLoop {
fun icmp_IPv4(addr: String): ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket
fun icmp_IPv6(addr: String): ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket
}

View File

@ -0,0 +1,27 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
/**
* User-side interface of socket event-loops.
*/
@Suppress("FunctionName")
interface SocketEventLoopScope {
/**
* Creates [ICMP socket][IcmpSocket] bound with specified [address][addr] to communicate over IPv4.
*/
fun icmp_IPv4(addr: String): IcmpSocket
/**
* Creates [ICMP socket][IcmpSocket] bound with specified [address][addr] to communicate over IPv6.
*/
fun icmp_IPv6(addr: String): IcmpSocket
interface Closeable : SocketEventLoopScope, AutoCloseable {
/**
* Closes event-loop and releases related resources.
*
* @throws IllegalStateException if there are unclosed sockets bound to this event-loop.
* @throws IllegalStateException if called on already closed socket.
*/
override fun close()
}
}

View File

@ -0,0 +1,67 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets.utility
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket
import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketEventLoopScope
@Suppress("FunctionName", "MemberVisibilityCanBePrivate", "PropertyName")
class CloseGuardEventLoop<T : SocketEventLoopScope.Closeable>(
val _unprotected: T,
) : SocketEventLoopScope.Closeable {
private var _isClosed = atomic(false)
private var _openedSockets = atomic(0uL)
private fun _assertNotClosed() {
if (this._isClosed.value) throw IllegalStateException("Event-loop is closed")
}
private fun _incrementOpenedSockets() = this._openedSockets.update { x -> x + 1u }
private fun _decrementOpenedSockets() = this._openedSockets.update { x -> x - 1u }
override fun icmp_IPv4(addr: String): IcmpSocket {
this._assertNotClosed()
this._incrementOpenedSockets()
return ProtectedIcmpSocket(this._unprotected.icmp_IPv4(addr))
}
override fun icmp_IPv6(addr: String): IcmpSocket {
this._assertNotClosed()
this._incrementOpenedSockets()
return ProtectedIcmpSocket(this._unprotected.icmp_IPv6(addr))
}
override fun close() {
this._assertNotClosed()
if (this._openedSockets.value > 0uL)
throw IllegalStateException("There are still opened sockets, close them manually first")
this._unprotected.close()
}
private inner class ProtectedIcmpSocket(private val _unprotected: IcmpSocket) : IcmpSocket {
private val _isClosed = atomic(false)
private fun _assertNotClosed() {
if (this._isClosed.value) throw IllegalStateException("Socket is closed")
}
override suspend fun sendRaw(data: UByteArray) {
this._assertNotClosed()
this._unprotected.sendRaw(data)
}
override suspend fun recvRaw(): UByteArray {
this._assertNotClosed()
return this._unprotected.recvRaw()
}
override fun close() {
if (this._isClosed.getAndSet(true))
throw IllegalStateException("Socket is closed")
this._unprotected.close()
this@CloseGuardEventLoop._decrementOpenedSockets()
}
}
}

View File

@ -0,0 +1,198 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
import kotlinx.cinterop.CValue
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.StableRef
import kotlinx.cinterop.allocArray
import kotlinx.cinterop.asStableRef
import kotlinx.cinterop.cValue
import kotlinx.cinterop.get
import kotlinx.cinterop.memScoped
import kotlinx.cinterop.ptr
import platform.linux.EPOLLIN
import platform.linux.EPOLLONESHOT
import platform.linux.EPOLLOUT
import platform.linux.EPOLL_CTL_ADD
import platform.linux.EPOLL_CTL_MOD
import platform.linux.epoll_create
import platform.linux.epoll_ctl
import platform.linux.epoll_event
import platform.linux.epoll_wait
import platform.linux.inet_pton
import platform.posix.AF_INET
import platform.posix.sockaddr_in
import platform.posix.sockaddr_in6
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Mutex as TMutex
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2
@OptIn(ExperimentalForeignApi::class)
@Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName")
class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
private val _epollDescriptor: Int
constructor() {
this._epollDescriptor = epoll_create(1)
if (this._epollDescriptor < 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to create epoll descriptor: $d") }
}
private fun _parseIpv4(rawAddr: String, port: UShort): CValue<sockaddr_in> = cValue<sockaddr_in> out@{
memScoped<Unit> {
val err = inet_pton(AF_INET, rawAddr, this@out.sin_addr.ptr)
when {
err == 0 -> throw IllegalArgumentException("IPv4 address is ill-formatted")
err < 0 -> PosixUtilities.throwErrno { d -> throw RuntimeException("Failed to parse IPv4 address: $d") }
}
}
this@out.sin_family = AF_INET.toUShort()
this@out.sin_port = port
}
override fun icmp_IPv4(addr: String): IcmpSocket {
return BoundIcmpSocket(this._parseIpv4(addr, 0u))
}
override fun icmp_IPv6(addr: String): IcmpSocket {
TODO("Not yet implemented")
}
private fun _waitFor(metadata: _SocketMetadata) {
val ee = cValue<epoll_event> {
this.events = ((if (metadata.read == null) 0 else EPOLLIN) or (if (metadata.write == null) 0 else EPOLLOUT) or EPOLLONESHOT).toUInt()
this.data.fd = metadata.fd
this.data.ptr = metadata.stableRef.asCPointer()
}
if (!metadata.isAddedToEpoll) {
if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_ADD, metadata.fd, ee))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to add fd to epoll: $d") }
metadata.isAddedToEpoll = true
} else {
if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_MOD, metadata.fd, ee))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to modify fd in epoll: $d") }
}
}
private suspend fun _waitForRead(metadata: _SocketMetadata) = suspendCoroutine { continuation ->
metadata.sync.withLock {
if (metadata.read != null)
throw IllegalStateException("Socket already waiting for read")
metadata.read = continuation
this._waitFor(metadata)
}
}
private suspend fun _waitForWrite(metadata: _SocketMetadata) = suspendCoroutine { continuation ->
metadata.sync.withLock {
if (metadata.write != null)
throw IllegalStateException("Socket already waiting for write")
metadata.write = continuation
this._waitFor(metadata)
}
}
private fun _resume(ev: epoll_event) {
val metadata = (ev.data.ptr ?: throw RuntimeException("Lost reference to socket metadata")).asStableRef<_SocketMetadata>().get()
metadata.sync.withLock {
if ((ev.events and EPOLLIN.toUInt()) != 0u) {
(metadata.read ?: throw RuntimeException("Socket ready for read, but nobody requests")).resume(Unit)
metadata.read = null
}
if ((ev.events and EPOLLOUT.toUInt()) != 0u) {
(metadata.write ?: throw RuntimeException("Socket ready for read, but nobody requests")).resume(Unit)
metadata.write = null
}
if (metadata.read != null || metadata.write != null)
this._waitFor(metadata)
}
}
fun runUntilError(eventsBufferSize: Int = 1000) {
memScoped {
val readyEvents = allocArray<epoll_event>(eventsBufferSize)
while (true) {
val readyEventsCount = epoll_wait(this@EpollSocketEventLoop._epollDescriptor, readyEvents, eventsBufferSize, -1)
if (readyEventsCount < 0)
PosixUtilities.throwErrno { d -> RuntimeException("Error while waiting epoll: $d") }
for (i in 0..<readyEventsCount) {
this@EpollSocketEventLoop._resume(readyEvents[i])
}
}
}
}
override fun close() {
TODO("Not yet implemented")
}
class _SocketMetadata : AutoCloseable {
val fd: Int
var read: Continuation<Unit>?
var write: Continuation<Unit>?
val sync: TMutex
val stableRef: StableRef<_SocketMetadata>
var isAddedToEpoll: Boolean
constructor(fd: Int) {
this.isAddedToEpoll = false
this.fd = fd
this.read = null
this.write = null
this.stableRef = StableRef.create(this)
_safeAutoClose2(onAbort = this.stableRef::dispose) {
this.sync = TMutex()
}
}
override fun close() {
_safeAutoClose1(
action = this.sync::close,
finally = this.stableRef::dispose
)
}
}
private inner class BoundIcmpSocket : NonblockingIcmpSocketImpl {
private val __metadata: _SocketMetadata
constructor(ipv4: CValue<sockaddr_in>) : super(ipv4) {
_safeAutoClose2(onAbort = { super.close() }) {
this.__metadata = _SocketMetadata(this._socketFd)
}
}
constructor(ipv6: CValue<sockaddr_in6>) : super(ipv6) {
_safeAutoClose2(onAbort = { super.close() }) {
this.__metadata = _SocketMetadata(this._socketFd)
}
}
override suspend fun _waitForWrite() =
this@EpollSocketEventLoop._waitForWrite(this.__metadata)
override suspend fun _waitForRead() =
this@EpollSocketEventLoop._waitForRead(this.__metadata)
override fun close() {
_safeAutoClose1(
action = {},
finally = {
_safeAutoClose1(
action = this.__metadata::close,
finally = { super.close() }
)
}
)
}
}
}

View File

@ -1,158 +0,0 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
import kotlin.concurrent.AtomicReference
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
import kotlinx.atomicfu.atomic
import kotlinx.cinterop.CValue
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.allocArray
import kotlinx.cinterop.cValue
import kotlinx.cinterop.get
import kotlinx.cinterop.memScoped
import kotlinx.cinterop.ptr
import platform.linux.EPOLLIN
import platform.linux.EPOLLOUT
import platform.linux.EPOLL_CTL_ADD
import platform.linux.epoll_create
import platform.linux.epoll_ctl
import platform.linux.epoll_event
import platform.linux.epoll_wait
import platform.linux.inet_pton
import platform.posix.AF_INET
import platform.posix.sockaddr_in
import platform.posix.sockaddr_in6
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.TMutex
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock
@OptIn(ExperimentalForeignApi::class)
@Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName")
actual class SocketBlockingEventLoop : SocketEventLoop {
private val _tsync = TMutex()
private val _id2cont = HashMap<ULong, Continuation<Unit>>()
private var _nextSockId: ULong = 0uL
private val _epollDescriptor: Int
actual constructor() {
this._epollDescriptor = epoll_create(0)
if (this._epollDescriptor < 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to create epoll descriptor: $d") }
}
private val _state = AtomicReference(State.READY)
private val _openedSockets = atomic(0uL)
private enum class State {
READY, RUNNING, CLOSED
}
private fun _assertNotClosed() {
if (this._state.value == State.CLOSED)
throw IllegalStateException("Eventloop is closed")
}
private fun _parseIpv4(rawAddr: String, port: UShort): CValue<sockaddr_in> = cValue<sockaddr_in> out@{
memScoped<Unit> {
val err = inet_pton(AF_INET, rawAddr, this@out.sin_addr.ptr)
when {
err == 0 -> throw IllegalArgumentException("IPv4 address is ill-formatted")
err < 0 -> PosixUtilities.throwErrno { d -> throw RuntimeException("Failed to parse IPv4 address: $d") }
}
}
this@out.sin_family = AF_INET.toUShort()
this@out.sin_port = port
}
override fun icmp_IPv4(addr: String): IcmpSocket {
this._assertNotClosed()
return BoundIcmpSocket(this._parseIpv4(addr, 0u))
}
override fun icmp_IPv6(addr: String): IcmpSocket {
this._assertNotClosed()
TODO("Not yet implemented")
}
private suspend fun _waitFor(fd: Int, action: Int) = suspendCoroutine { continuation ->
val contId: ULong
this._tsync.withLock {
contId = this._nextSockId++
this._id2cont[contId] = continuation
}
val ee = cValue<epoll_event> {
this.events = action.toUInt()
this.data.fd = fd
this.data.u64 = contId
}
if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_ADD, fd, ee))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to add event to epoll: $d") }
}
private suspend fun _waitForRead(fd: Int) =
this._waitFor(fd, EPOLLIN)
private suspend fun _waitForWrite(fd: Int) =
this._waitFor(fd, EPOLLOUT)
private fun _resume(id: ULong) {
val cont = this._tsync.withLock { this._id2cont.remove(id) }
if (cont == null) {
// todo
return
}
cont.resume(Unit)
}
private fun _mainloop() {
when (this._state.compareAndExchange(State.READY, State.RUNNING)) {
State.RUNNING -> throw IllegalStateException("Eventloop already running")
State.CLOSED -> throw IllegalStateException("Eventloop is closed")
State.READY -> {}
}
val eventsBufferSize = 100
memScoped {
val readyEvents = allocArray<epoll_event>(eventsBufferSize)
while (true) {
val readyEventsCount = epoll_wait(this@SocketBlockingEventLoop._epollDescriptor, readyEvents, eventsBufferSize, -1)
if (readyEventsCount < 0)
PosixUtilities.throwErrno { d -> RuntimeException("Error while waiting epoll: $d") }
for (i in 0..<readyEventsCount) {
this@SocketBlockingEventLoop._resume(readyEvents[0].data.u64)
}
}
}
}
actual companion object {
actual fun runForever(el: SocketBlockingEventLoop) {
el._mainloop()
}
}
private inner class BoundIcmpSocket : NonblockingIcmpSocketImpl {
constructor(ipv4: CValue<sockaddr_in>) : super(ipv4)
constructor(ipv6: CValue<sockaddr_in6>) : super(ipv6)
override suspend fun _waitForWrite() =
this@SocketBlockingEventLoop._waitForWrite(this._socketFd)
override suspend fun _waitForRead() =
this@SocketBlockingEventLoop._waitForRead(this._socketFd)
override fun close() {
TODO("Not yet implemented")
}
}
}

View File

@ -1,14 +1,18 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
import kotlinx.atomicfu.locks.MutexPool
import kotlinx.cinterop.CValue import kotlinx.cinterop.CValue
import kotlinx.cinterop.ExperimentalForeignApi import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.addressOf import kotlinx.cinterop.addressOf
import kotlinx.cinterop.memScoped import kotlinx.cinterop.memScoped
import kotlinx.cinterop.usePinned import kotlinx.cinterop.usePinned
import kotlinx.coroutines.sync.Mutex as CMutex
import kotlinx.coroutines.sync.withLock
import platform.posix.AF_INET import platform.posix.AF_INET
import platform.posix.AF_INET6 import platform.posix.AF_INET6
import platform.posix.EAGAIN import platform.posix.EAGAIN
import platform.posix.EMSGSIZE import platform.posix.EMSGSIZE
import platform.posix.EPERM
import platform.posix.EWOULDBLOCK import platform.posix.EWOULDBLOCK
import platform.posix.F_SETFL import platform.posix.F_SETFL
import platform.posix.IPPROTO_ICMP import platform.posix.IPPROTO_ICMP
@ -31,32 +35,37 @@ import platform.posix.recv as recv_lowlevel
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.CUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.CUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
@Suppress("CanBeVal", "DUPLICATE_LABEL_IN_WHEN") @Suppress("CanBeVal")
@OptIn(ExperimentalForeignApi::class) @OptIn(ExperimentalForeignApi::class)
internal abstract class NonblockingIcmpSocketImpl : IcmpSocket { internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
protected val _socketFd: Int protected val _socketFd: Int
private val _rSync = CMutex()
private val _wSync = CMutex()
protected constructor(ipv4: CValue<sockaddr_in>) { protected constructor(ipv4: CValue<sockaddr_in>) {
this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI<sockaddr_in>()) this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI<sockaddr_in>(), "IPv4")
} }
protected constructor(ipv6: CValue<sockaddr_in6>) { protected constructor(ipv6: CValue<sockaddr_in6>) {
this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI<sockaddr_in6>()) this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI<sockaddr_in6>(), "IPv6")
} }
@Suppress("FunctionName") @Suppress("FunctionName")
private fun __createSocket(addrFamily: Int, protocol: Int, addrValue: CValue<*>, addrSize: UInt): Int { private fun __createSocket(addrFamily: Int, protocol: Int, addrValue: CValue<*>, addrSize: UInt, protoName: String): Int {
val sock = socket(addrFamily, SOCK_RAW, protocol) val sock = socket(addrFamily, SOCK_RAW, protocol)
if (sock < 0) if (sock < 0) {
PosixUtilities.throwErrno { d -> RuntimeException("Failed to create ICMP socket over IPv4: $d") } if (errno == EPERM)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to create ICMP socket over ${protoName}: $d (maybe you need to run this program as superuser)") }
else
PosixUtilities.throwErrno { d -> RuntimeException("Failed to create ICMP socket over ${protoName}: $d") }
}
var err = memScoped { var err = memScoped {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
return@memScoped connect(sock, addrValue as CValue<sockaddr>, addrSize) return@memScoped connect(sock, addrValue as CValue<sockaddr>, addrSize)
} }
if (err != 0) if (err != 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to bind ICMP socket to provided IPv6 address: $d") } PosixUtilities.throwErrno { d -> RuntimeException("Failed to bind ICMP socket to provided $protoName address: $d") }
err = fcntl(sock, F_SETFL, O_NONBLOCK) err = fcntl(sock, F_SETFL, O_NONBLOCK)
if (err != 0) if (err != 0)
@ -72,23 +81,25 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
@Suppress("FunctionName") @Suppress("FunctionName")
protected abstract suspend fun _waitForRead() protected abstract suspend fun _waitForRead()
override suspend fun send(data: UByteArray) { override suspend fun sendRaw(data: UByteArray) = this._wSync.withLock {
polling@ while (true) { polling@ while (true) {
this._waitForWrite() this._waitForWrite()
val sentCount = data.usePinned { pinnedData -> val sentCount = data.usePinned { pinnedData ->
return@usePinned send_lowlevel(this._socketFd, pinnedData.addressOf(0), data.size.toULong(), MSG_NOSIGNAL) return@usePinned send_lowlevel(this._socketFd, pinnedData.addressOf(0), data.size.toULong(), MSG_NOSIGNAL)
} }
val err = errno
if (sentCount < 0) { if (sentCount < 0) {
when (errno) { when (err) {
EAGAIN, EWOULDBLOCK -> continue@polling EAGAIN, EWOULDBLOCK -> continue@polling
EMSGSIZE -> throw IllegalArgumentException("Data size bigger than can be sent by one packet") EMSGSIZE -> throw IllegalArgumentException("Data size bigger than can be sent by one packet")
else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message ove ICMP socket: $d") } else -> PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to send message ove ICMP socket: $d") }
} }
} }
return
} }
} }
override suspend fun recv(): UByteArray { override suspend fun recvRaw(): UByteArray = this._rSync.withLock {
polling@ while (true) { polling@ while (true) {
this._waitForRead() this._waitForRead()
val bytesAvailable: ssize_t = recv_lowlevel( val bytesAvailable: ssize_t = recv_lowlevel(
@ -99,7 +110,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
if (bytesAvailable < 0) { if (bytesAvailable < 0) {
when (errno) { when (errno) {
EAGAIN, EWOULDBLOCK -> continue@polling EAGAIN, EWOULDBLOCK -> continue@polling
else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message ove ICMP socket: $d") } else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") }
} }
} }
@ -110,9 +121,15 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
if (sentCount < 0) { if (sentCount < 0) {
when (errno) { when (errno) {
EAGAIN, EWOULDBLOCK -> continue@polling EAGAIN, EWOULDBLOCK -> continue@polling
else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message ove ICMP socket: $d") } else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") }
} }
} }
return@withLock data
} }
return@withLock ubyteArrayOf()
}
override fun close() {
} }
} }

View File

@ -0,0 +1,28 @@
import ru.landgrafhomyak.bgtu.networks0.build_script.Dependencies
import ru.landgrafhomyak.bgtu.networks0.build_script.configureWarnings
import ru.landgrafhomyak.bgtu.networks0.build_script.setupHierarchy
import ru.landgrafhomyak.kotlin.kmp_gradle_build_helper.defineAllMultiplatformTargets
plugins {
kotlin("multiplatform")
}
repositories {
mavenCentral()
}
kotlin {
configureWarnings()
defineAllMultiplatformTargets()
sourceSets {
setupHierarchy()
commonMain {
dependencies {
implementation(Dependencies.kotlin_atomicfu)
}
}
}
}

View File

@ -1,10 +1,10 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading package ru.landgrafhomyak.bgtu.networks0.utilities
import kotlinx.atomicfu.AtomicLong import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.atomic import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update import kotlinx.atomicfu.update
internal class _Refcnt(private val _errMessage: String) { class CloseableRefCounter(private val _errMessage: String) {
private val _value: AtomicLong = atomic(0L) private val _value: AtomicLong = atomic(0L)
fun throwClosed() { fun throwClosed() {

View File

@ -0,0 +1,20 @@
package ru.landgrafhomyak.bgtu.networks0.utilities
import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.AtomicRef
fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long {
while (true) {
val old = this.value
if (old != expected) return old
if (this.compareAndSet(old, newValue)) return old
}
}
fun <T> AtomicRef<T>.compareAndExchange(expected: T, newValue: T): T {
while (true) {
val old = this.value
if (old === expected) return old
if (this.compareAndSet(old, newValue)) return old
}
}

View File

@ -1,32 +1,13 @@
@file:OptIn(ExperimentalContracts::class) @file:OptIn(ExperimentalContracts::class)
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading package ru.landgrafhomyak.bgtu.networks0.utilities
import kotlin.contracts.ExperimentalContracts import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind import kotlin.contracts.InvocationKind
import kotlin.contracts.contract import kotlin.contracts.contract
import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.AtomicRef
internal fun AtomicLong.compareAndExchange(expected: Long, newValue: Long): Long {
while (true) {
val old = this.value
if (old != expected) return old
if (this.compareAndSet(old, newValue)) return old
}
}
internal fun <T> AtomicRef<T>.compareAndExchange(expected: T, newValue: T): T {
while (true) {
val old = this.value
if (old === expected) return old
if (this.compareAndSet(old, newValue)) return old
}
}
@Suppress("WRONG_INVOCATION_KIND") @Suppress("WRONG_INVOCATION_KIND")
@PublishedApi inline fun <R> _safeAutoClose1(
internal inline fun <R> _safeAutoClose1(
finally: () -> Unit, finally: () -> Unit,
action: () -> R action: () -> R
): R { ): R {
@ -38,8 +19,7 @@ internal inline fun <R> _safeAutoClose1(
} }
@Suppress("WRONG_INVOCATION_KIND") @Suppress("WRONG_INVOCATION_KIND")
@PublishedApi inline fun <R> _safeAutoClose2(
internal inline fun <R> _safeAutoClose2(
onAbort: () -> Unit = {}, onAbort: () -> Unit = {},
onSuccess: () -> Unit = {}, onSuccess: () -> Unit = {},
action: () -> R action: () -> R
@ -53,8 +33,7 @@ internal inline fun <R> _safeAutoClose2(
} }
@Suppress("WRONG_INVOCATION_KIND") @Suppress("WRONG_INVOCATION_KIND")
@PublishedApi inline fun <R> _safeAutoClose2e(
internal inline fun <R> _safeAutoClose2e(
onAbort: (Throwable) -> Unit = { _ -> }, onAbort: (Throwable) -> Unit = { _ -> },
onSuccess: () -> Unit = {}, onSuccess: () -> Unit = {},
action: () -> R action: () -> R
@ -67,8 +46,7 @@ internal inline fun <R> _safeAutoClose2e(
return _safeAutoClose3e(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action) return _safeAutoClose3e(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action)
} }
@PublishedApi inline fun <R> _safeAutoClose3(
internal inline fun <R> _safeAutoClose3(
onAbort: () -> Unit = {}, onAbort: () -> Unit = {},
onSuccess: () -> Unit = {}, onSuccess: () -> Unit = {},
onCrossReturn: () -> Unit = {}, onCrossReturn: () -> Unit = {},
@ -83,8 +61,7 @@ internal inline fun <R> _safeAutoClose3(
return _safeAutoClose3e(onAbort = { t -> onAbort() }, onSuccess = onSuccess, onCrossReturn = onCrossReturn, action = action) return _safeAutoClose3e(onAbort = { t -> onAbort() }, onSuccess = onSuccess, onCrossReturn = onCrossReturn, action = action)
} }
@PublishedApi inline fun <R> _safeAutoClose3e(
internal inline fun <R> _safeAutoClose3e(
onAbort: (Throwable) -> Unit = { _ -> }, onAbort: (Throwable) -> Unit = { _ -> },
onSuccess: () -> Unit = {}, onSuccess: () -> Unit = {},
onCrossReturn: () -> Unit = {}, onCrossReturn: () -> Unit = {},

View File

@ -19,6 +19,7 @@ kotlin {
linuxX64Main { linuxX64Main {
dependencies { dependencies {
implementation(project(":modules:low-level:multithreading")) implementation(project(":modules:low-level:multithreading"))
implementation(project(":modules:low-level:sockets"))
} }
} }
} }

View File

@ -1,41 +1,71 @@
import platform.posix.sleep import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Thread import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Thread
import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.EpollSocketEventLoop
class Printer(private val prefix: String) : Thread.Routine { class EventLoopRoutine(private val loop: EpollSocketEventLoop) : Thread.Routine {
override fun run() { override fun run() {
for (i in 0..3) { while (true) {
print("${this.prefix}: ${i}\n") try {
sleep(1u) this.loop.runUntilError()
} catch (e: Throwable) {
e.printStackTrace()
break // todo remove
}
} }
} }
} }
fun main() { fun main() {
try { try {
println("entry") val loop = EpollSocketEventLoop()
val t1 = Thread(Printer("t1")) val loopThread = Thread(EventLoopRoutine(loop))
val t2 = Thread(Printer("t2")) loopThread.start()
val t3 = Thread(Printer("t3"))
val t4 = Thread(Printer("t4"))
println("created") val googleSock = loop.icmp_IPv4("8.8.8.8")
sleep(2u) val yandexSock = loop.icmp_IPv4("5.255.255.70")
println("starting...") val phoneSock = loop.icmp_IPv4("192.168.43.1")
val pingData = ubyteArrayOf(
0x08u, 0x00u, 0x4Cu, 0xE4u, 0x00u, 0x01u, 0x00u, 0x77u,
0x61u, 0x62u, 0x63u, 0x64u, 0x65u, 0x66u, 0x67u, 0x68u,
0x69u, 0x6Au, 0x6Bu, 0x6Cu, 0x6Du, 0x6Eu, 0x6Fu, 0x70u,
0x71u, 0x72u, 0x73u, 0x74u, 0x75u, 0x76u, 0x77u, 0x61u,
0x62u, 0x63u, 0x64u, 0x65u, 0x66u, 0x67u, 0x68u, 0x69u
)
t1.start() runBlocking {
t2.start() launch {
t3.start() while (true) {
t4.start() googleSock.sendRaw(pingData)
print("sent to google\n")
googleSock.recvRaw()
print("got from google\n")
delay(1000)
}
}
launch {
while (true) {
yandexSock.sendRaw(pingData)
print("sent to yandex\n")
yandexSock.recvRaw()
print("got from yandex\n")
delay(2500)
}
}
launch {
while (true) {
phoneSock.sendRaw(pingData)
print("sent to phone\n")
phoneSock.recvRaw()
print("got from phone\n")
delay(5500)
}
}
}
print("started\n")
t1.join()?.let(Throwable::printStackTrace)
t2.join()?.let(Throwable::printStackTrace)
t3.join()?.let(Throwable::printStackTrace)
t4.join()?.let(Throwable::printStackTrace)
println("finished")
} catch (e: Throwable) { } catch (e: Throwable) {
e.printStackTrace() e.printStackTrace()
} }

View File

@ -21,6 +21,7 @@ pluginManagement {
} }
} }
} }
include(":modules:utilities")
include(":modules:low-level:c-interop-utilities") include(":modules:low-level:c-interop-utilities")
include(":modules:low-level:multithreading") include(":modules:low-level:multithreading")
include(":modules:low-level:sockets") include(":modules:low-level:sockets")