Protecting epoll event-loop

This commit is contained in:
Andrew Golovashevich 2025-03-19 21:59:49 +03:00
parent 7e2c6d0e43
commit bc9ba0ae54
11 changed files with 158 additions and 167 deletions

View File

@ -14,6 +14,8 @@ repositories {
kotlin {
configureWarnings()
jvm()
mingwX64()
linuxX64()
linuxArm64()

View File

@ -1,8 +1,8 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2
class CountDownLatch : AutoCloseable {
private val _refcnt: CloseableRefCounter
@ -14,7 +14,7 @@ class CountDownLatch : AutoCloseable {
this._refcnt = CloseableRefCounter("Latch was destroyed")
this._counter = initialCounterValue
this._mutex = Mutex()
_safeAutoClose2(onAbort = this._mutex::close) {
safeAutoClose2(onAbort = this._mutex::close) {
this._condition = Condition()
}
}
@ -37,7 +37,7 @@ class CountDownLatch : AutoCloseable {
override fun close() {
this._refcnt.close("Latch is still in use")
_safeAutoClose1(
safeAutoClose1(
action = this._condition::close,
finally = this._mutex::close
)

View File

@ -3,7 +3,7 @@ package ru.landgrafhomyak.bgtu.networks0.low_level.multithreading
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1
@OptIn(ExperimentalContracts::class)
inline fun <R> Mutex.withLock(synchronizedBlock: () -> R): R {
@ -11,5 +11,5 @@ inline fun <R> Mutex.withLock(synchronizedBlock: () -> R): R {
callsInPlace(synchronizedBlock, InvocationKind.EXACTLY_ONCE)
}
this.lock()
return _safeAutoClose1(finally = this::unlock, synchronizedBlock)
return safeAutoClose1(finally = this::unlock, synchronizedBlock)
}

View File

@ -14,8 +14,8 @@ import platform.posix.pthread_cond_broadcast
import platform.posix.pthread_cond_signal
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2
@OptIn(ExperimentalForeignApi::class)
@ -26,7 +26,7 @@ actual class Condition : AutoCloseable {
actual constructor() {
this._refcnt = CloseableRefCounter("Pthreads condition was destroyed")
this._descriptor = nativeHeap.alloc<pthread_cond_t>().ptr
_safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) {
safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) {
var err = pthread_cond_init(this._descriptor, null)
if (err != 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to initialize pthreads condition: $d") }
@ -56,7 +56,7 @@ actual class Condition : AutoCloseable {
override fun close() {
this._refcnt.close("There are waiting threads on this pthreads condition")
_safeAutoClose1(
safeAutoClose1(
action = {
var err = pthread_cond_destroy(this._descriptor)
if (err != 0)

View File

@ -13,8 +13,8 @@ import platform.posix.pthread_mutex_t
import platform.posix.pthread_mutex_unlock
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2
@OptIn(ExperimentalForeignApi::class, Mutex.RefcntAccess::class)
actual class Mutex : AutoCloseable {
@ -28,7 +28,7 @@ actual class Mutex : AutoCloseable {
actual constructor() {
this._refcnt = CloseableRefCounter("Pthreads mutex was destroyed")
this._descriptor = nativeHeap.alloc<pthread_mutex_t>().ptr
_safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) {
safeAutoClose2(onAbort = { nativeHeap.free(this._descriptor) }) {
var err = pthread_mutex_init(this._descriptor, null)
if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to create pthreads mutex: $d") }
@ -53,7 +53,7 @@ actual class Mutex : AutoCloseable {
override fun close() {
this._refcnt.close("There are waiting threads on this pthreads mutex")
_safeAutoClose1(
safeAutoClose1(
action = {
var err = pthread_mutex_destroy(this._descriptor)
if (err != 0)

View File

@ -9,9 +9,9 @@ import kotlinx.cinterop.StableRef
import kotlinx.cinterop.asStableRef
import kotlinx.cinterop.staticCFunction
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2e
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2e
import ru.landgrafhomyak.bgtu.networks0.utilities.compareAndExchange
@OptIn(ExperimentalForeignApi::class)
@ -38,9 +38,9 @@ actual class Thread : AutoCloseable {
actual constructor(routine: Routine) {
this._state = atomic(State.PENDING)
this._bootstrapArgRef = StableRef.create(ThreadBootstrapContext(routine))
_safeAutoClose2(onAbort = this._bootstrapArgRef::dispose) {
safeAutoClose2(onAbort = this._bootstrapArgRef::dispose) {
this._threadBindings = _PthreadsThreadBindings()
_safeAutoClose2(onAbort = this._threadBindings::free) {
safeAutoClose2(onAbort = this._threadBindings::free) {
var err = this._threadBindings.create(
null,
staticCFunction({ arg -> return@staticCFunction Thread._threadBootstrap(arg) }),
@ -60,7 +60,7 @@ actual class Thread : AutoCloseable {
State.STARTING, State.RUNNING -> throw IllegalStateException("Pthreads thread already running")
State.PENDING -> {}
}
_safeAutoClose2(onAbort = { this._state.value = State.PENDING }) {
safeAutoClose2(onAbort = { this._state.value = State.PENDING }) {
this._bootstrapArgRef.get().startSignal.decrement()
}
}
@ -87,14 +87,14 @@ actual class Thread : AutoCloseable {
return@update State.CLOSED
}
_safeAutoClose1(
safeAutoClose1(
action = {
var err = this._threadBindings.join(null)
if (err != 0)
PosixUtilities.throwErrno(err) { d -> RuntimeException("Failed to destroy pthreads thread: $d") }
},
finally = {
_safeAutoClose1(
safeAutoClose1(
action = { this._threadBindings.free() },
finally = { this._bootstrapArgRef.dispose() }
)
@ -112,7 +112,7 @@ actual class Thread : AutoCloseable {
return null
}
try {
_safeAutoClose2e(
safeAutoClose2e(
onAbort = { e ->
context.exitedWithError = e
context.threadState.value = State.FINISHED

View File

@ -1,67 +0,0 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets.utility
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.IcmpSocket
import ru.landgrafhomyak.bgtu.networks0.low_level.sockets.SocketEventLoopScope
@Suppress("FunctionName", "MemberVisibilityCanBePrivate", "PropertyName")
class CloseGuardEventLoop<T : SocketEventLoopScope.Closeable>(
val _unprotected: T,
) : SocketEventLoopScope.Closeable {
private var _isClosed = atomic(false)
private var _openedSockets = atomic(0uL)
private fun _assertNotClosed() {
if (this._isClosed.value) throw IllegalStateException("Event-loop is closed")
}
private fun _incrementOpenedSockets() = this._openedSockets.update { x -> x + 1u }
private fun _decrementOpenedSockets() = this._openedSockets.update { x -> x - 1u }
override fun icmp_IPv4(addr: String): IcmpSocket {
this._assertNotClosed()
this._incrementOpenedSockets()
return ProtectedIcmpSocket(this._unprotected.icmp_IPv4(addr))
}
override fun icmp_IPv6(addr: String): IcmpSocket {
this._assertNotClosed()
this._incrementOpenedSockets()
return ProtectedIcmpSocket(this._unprotected.icmp_IPv6(addr))
}
override fun close() {
this._assertNotClosed()
if (this._openedSockets.value > 0uL)
throw IllegalStateException("There are still opened sockets, close them manually first")
this._unprotected.close()
}
private inner class ProtectedIcmpSocket(private val _unprotected: IcmpSocket) : IcmpSocket {
private val _isClosed = atomic(false)
private fun _assertNotClosed() {
if (this._isClosed.value) throw IllegalStateException("Socket is closed")
}
override suspend fun sendRaw(data: UByteArray) {
this._assertNotClosed()
this._unprotected.sendRaw(data)
}
override suspend fun recvRaw(): UByteArray {
this._assertNotClosed()
return this._unprotected.recvRaw()
}
override fun close() {
if (this._isClosed.getAndSet(true))
throw IllegalStateException("Socket is closed")
this._unprotected.close()
this@CloseGuardEventLoop._decrementOpenedSockets()
}
}
}

View File

@ -16,6 +16,7 @@ import platform.linux.EPOLLIN
import platform.linux.EPOLLONESHOT
import platform.linux.EPOLLOUT
import platform.linux.EPOLL_CTL_ADD
import platform.linux.EPOLL_CTL_DEL
import platform.linux.EPOLL_CTL_MOD
import platform.linux.epoll_create
import platform.linux.epoll_ctl
@ -28,22 +29,25 @@ import platform.posix.sockaddr_in6
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.Mutex as TMutex
import ru.landgrafhomyak.bgtu.networks0.low_level.multithreading.withLock
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities._safeAutoClose2
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose1
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2
@OptIn(ExperimentalForeignApi::class)
@Suppress("JoinDeclarationAndAssignment", "ConvertSecondaryConstructorToPrimary", "FunctionName")
class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
private val _epollDescriptor: Int
private val _socketsCount: CloseableRefCounter
constructor() {
this._socketsCount = CloseableRefCounter("Epoll event-loop is closed")
this._epollDescriptor = epoll_create(1)
if (this._epollDescriptor < 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to create epoll descriptor: $d") }
}
private fun _parseIpv4(rawAddr: String, port: UShort): CValue<sockaddr_in> = cValue<sockaddr_in> out@{
memScoped<Unit> {
memScoped {
val err = inet_pton(AF_INET, rawAddr, this@out.sin_addr.ptr)
when {
err == 0 -> throw IllegalArgumentException("IPv4 address is ill-formatted")
@ -78,6 +82,15 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
}
}
private fun _remove(metadata: _SocketMetadata) {
metadata.sync.withLock {
if (metadata.isAddedToEpoll) {
if (0 != epoll_ctl(this._epollDescriptor, EPOLL_CTL_DEL, metadata.fd, null))
PosixUtilities.throwErrno { d -> RuntimeException("Failed to remove fd from epoll: $d") }
}
}
}
private suspend fun _waitForRead(metadata: _SocketMetadata) = suspendCoroutine { continuation ->
metadata.sync.withLock {
if (metadata.read != null)
@ -147,13 +160,13 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
this.read = null
this.write = null
this.stableRef = StableRef.create(this)
_safeAutoClose2(onAbort = this.stableRef::dispose) {
safeAutoClose2(onAbort = this.stableRef::dispose) {
this.sync = TMutex()
}
}
override fun close() {
_safeAutoClose1(
safeAutoClose1(
action = this.sync::close,
finally = this.stableRef::dispose
)
@ -165,16 +178,18 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
private val __metadata: _SocketMetadata
constructor(ipv4: CValue<sockaddr_in>) : super(ipv4) {
_safeAutoClose2(onAbort = { super.close() }) {
safeAutoClose2(onAbort = { super.close() }) {
this.__metadata = _SocketMetadata(this._socketFd)
}
}
constructor(ipv6: CValue<sockaddr_in6>) : super(ipv6) {
_safeAutoClose2(onAbort = { super.close() }) {
safeAutoClose2(onAbort = { super.close() }) {
this@EpollSocketEventLoop._socketsCount.tryIncref {
this.__metadata = _SocketMetadata(this._socketFd)
}
}
}
override suspend fun _waitForWrite() =
@ -183,16 +198,21 @@ class EpollSocketEventLoop : SocketEventLoopScope.Closeable {
override suspend fun _waitForRead() =
this@EpollSocketEventLoop._waitForRead(this.__metadata)
override fun close() {
_safeAutoClose1(
action = {},
override fun _free() {
safeAutoClose1(
action = { this@EpollSocketEventLoop._remove(this.__metadata) },
finally = {
_safeAutoClose1(
safeAutoClose1(
action = this@EpollSocketEventLoop._socketsCount::decref,
finally = {
safeAutoClose1(
action = this.__metadata::close,
finally = { super.close() }
)
}
)
}
)
}
}
}

View File

@ -1,6 +1,5 @@
package ru.landgrafhomyak.bgtu.networks0.low_level.sockets
import kotlinx.atomicfu.locks.MutexPool
import kotlinx.cinterop.CValue
import kotlinx.cinterop.ExperimentalForeignApi
import kotlinx.cinterop.addressOf
@ -32,21 +31,31 @@ import platform.posix.socket
import platform.posix.ssize_t
import platform.posix.send as send_lowlevel
import platform.posix.recv as recv_lowlevel
import platform.posix.close as close_lowlevel
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.CUtilities
import ru.landgrafhomyak.bgtu.networks0.low_level.c_interop_utilities.PosixUtilities
import ru.landgrafhomyak.bgtu.networks0.utilities.CloseableRefCounter
import ru.landgrafhomyak.bgtu.networks0.utilities.safeAutoClose2
@Suppress("CanBeVal")
@OptIn(ExperimentalForeignApi::class)
internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
protected val _socketFd: Int
private val _rSync = CMutex()
private val _wSync = CMutex()
private val __rSync: CMutex
private val __wSync: CMutex
private val __refcnt: CloseableRefCounter
protected constructor(ipv4: CValue<sockaddr_in>) {
this.__rSync = CMutex()
this.__wSync = CMutex()
this.__refcnt = CloseableRefCounter("ICMP socket was closed")
this._socketFd = this.__createSocket(AF_INET, IPPROTO_ICMP, ipv4, CUtilities.sizeOfUI<sockaddr_in>(), "IPv4")
}
protected constructor(ipv6: CValue<sockaddr_in6>) {
this.__rSync = CMutex()
this.__wSync = CMutex()
this.__refcnt = CloseableRefCounter("ICMP socket was closed")
this._socketFd = this.__createSocket(AF_INET6, IPPROTO_ICMPV6, ipv6, CUtilities.sizeOfUI<sockaddr_in6>(), "IPv6")
}
@ -59,7 +68,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
else
PosixUtilities.throwErrno { d -> RuntimeException("Failed to create ICMP socket over ${protoName}: $d") }
}
safeAutoClose2(onAbort = { if (0 != close_lowlevel(this._socketFd)) PosixUtilities.throwErrno { d -> RuntimeException("Failed to close ICMP socket: $d") } }) {
var err = memScoped {
@Suppress("UNCHECKED_CAST")
return@memScoped connect(sock, addrValue as CValue<sockaddr>, addrSize)
@ -70,7 +79,7 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
err = fcntl(sock, F_SETFL, O_NONBLOCK)
if (err != 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to switch ICMP socket to non-blocking mode: $d") }
}
return sock
}
@ -81,7 +90,9 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
@Suppress("FunctionName")
protected abstract suspend fun _waitForRead()
override suspend fun sendRaw(data: UByteArray) = this._wSync.withLock {
override suspend fun sendRaw(data: UByteArray) {
this.__refcnt.withRef {
this.__wSync.withLock {
polling@ while (true) {
this._waitForWrite()
val sentCount = data.usePinned { pinnedData ->
@ -98,8 +109,12 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
return
}
}
}
}
override suspend fun recvRaw(): UByteArray = this._rSync.withLock {
override suspend fun recvRaw(): UByteArray {
this.__refcnt.withRef {
this.__rSync.withLock {
polling@ while (true) {
this._waitForRead()
val bytesAvailable: ssize_t = recv_lowlevel(
@ -124,12 +139,22 @@ internal abstract class NonblockingIcmpSocketImpl : IcmpSocket {
else -> PosixUtilities.throwErrno { d -> RuntimeException("Failed to send message over ICMP socket: $d") }
}
}
return@withLock data
return data
}
return@withLock ubyteArrayOf()
@Suppress("KotlinUnreachableCode")
throw RuntimeException("unreachable")
}
}
}
protected open fun _free() {
var err = close_lowlevel(this._socketFd)
if (err != 0)
PosixUtilities.throwErrno { d -> RuntimeException("Failed to close ICMP socket: $d") }
}
override fun close() {
this.__refcnt.close("This socket still in use")
this._free()
}
}

View File

@ -1,5 +1,10 @@
@file:OptIn(ExperimentalContracts::class)
package ru.landgrafhomyak.bgtu.networks0.utilities
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
@ -19,8 +24,11 @@ class CloseableRefCounter(private val _errMessage: String) {
}
inline fun <R> tryIncref(block: () -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
this.incref()
return _safeAutoClose2(onAbort = this::decref, action = block)
return safeAutoClose2(onAbort = this::decref, action = block)
}
fun checkNotClosed() {
@ -32,8 +40,11 @@ class CloseableRefCounter(private val _errMessage: String) {
}
inline fun <R> tryDecref(block: () -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
this.checkNotClosed()
return _safeAutoClose2(onSuccess = this::decref, action = block)
return safeAutoClose2(onSuccess = this::decref, action = block)
}
fun close(errExistRefs: String) {
@ -46,7 +57,7 @@ class CloseableRefCounter(private val _errMessage: String) {
inline fun <R> withRef(protected: () -> R): R {
this.incref()
return _safeAutoClose1(finally = this::decref, action = protected)
return safeAutoClose1(finally = this::decref, action = protected)
}
override fun toString(): String {

View File

@ -7,7 +7,7 @@ import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
@Suppress("WRONG_INVOCATION_KIND")
inline fun <R> _safeAutoClose1(
inline fun <R> safeAutoClose1(
finally: () -> Unit,
action: () -> R
): R {
@ -15,11 +15,11 @@ inline fun <R> _safeAutoClose1(
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
callsInPlace(finally, InvocationKind.EXACTLY_ONCE)
}
return _safeAutoClose3(onAbort = finally, onSuccess = finally, onCrossReturn = finally, action = action)
return safeAutoClose3(onAbort = finally, onSuccess = finally, onCrossReturn = finally, action = action)
}
@Suppress("WRONG_INVOCATION_KIND")
inline fun <R> _safeAutoClose2(
inline fun <R> safeAutoClose2(
onAbort: () -> Unit = {},
onSuccess: () -> Unit = {},
action: () -> R
@ -29,11 +29,11 @@ inline fun <R> _safeAutoClose2(
callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE)
callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE)
}
return _safeAutoClose3(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action)
return safeAutoClose3(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action)
}
@Suppress("WRONG_INVOCATION_KIND")
inline fun <R> _safeAutoClose2e(
inline fun <R> safeAutoClose2e(
onAbort: (Throwable) -> Unit = { _ -> },
onSuccess: () -> Unit = {},
action: () -> R
@ -43,10 +43,10 @@ inline fun <R> _safeAutoClose2e(
callsInPlace(onAbort, InvocationKind.AT_MOST_ONCE)
callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE)
}
return _safeAutoClose3e(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action)
return safeAutoClose3e(onAbort = onAbort, onSuccess = onSuccess, onCrossReturn = onSuccess, action = action)
}
inline fun <R> _safeAutoClose3(
inline fun <R> safeAutoClose3(
onAbort: () -> Unit = {},
onSuccess: () -> Unit = {},
onCrossReturn: () -> Unit = {},
@ -58,10 +58,10 @@ inline fun <R> _safeAutoClose3(
callsInPlace(onSuccess, InvocationKind.AT_MOST_ONCE)
callsInPlace(onCrossReturn, InvocationKind.AT_MOST_ONCE)
}
return _safeAutoClose3e(onAbort = { t -> onAbort() }, onSuccess = onSuccess, onCrossReturn = onCrossReturn, action = action)
return safeAutoClose3e(onAbort = { t -> onAbort() }, onSuccess = onSuccess, onCrossReturn = onCrossReturn, action = action)
}
inline fun <R> _safeAutoClose3e(
inline fun <R> safeAutoClose3e(
onAbort: (Throwable) -> Unit = { _ -> },
onSuccess: () -> Unit = {},
onCrossReturn: () -> Unit = {},