Cleaner eventloop implementation

This commit is contained in:
Andrew Golovashevich 2026-03-02 15:42:54 +03:00
parent b2dcd7819a
commit 049b2b1a49
4 changed files with 205 additions and 59 deletions

View File

@ -1,7 +1,9 @@
use super::Address;
pub trait ServersContext {
fn iter_servers(&self) -> impl Iterator<Item=Address>;
fn start_measuring(&mut self) -> (u64, impl Iterator<Item = Address>);
fn report_ping(&mut self, addr: Address, ping_ms: u64);
}
fn report_ping(&mut self, addr: Address, id: u64, ping_ms: u64);
fn report_error(&mut self, addr: Address, id: u64);
}

View File

@ -8,7 +8,7 @@ use windows::Win32::System::Diagnostics::Debug::{
FORMAT_MESSAGE_ALLOCATE_BUFFER, FORMAT_MESSAGE_FROM_SYSTEM, FORMAT_MESSAGE_IGNORE_INSERTS,
FormatMessageA,
};
use windows::core::PSTR;
use windows::core::{HRESULT, PSTR};
trait ErrCode {
fn to_dword(&self) -> u32;
@ -16,9 +16,7 @@ trait ErrCode {
impl ErrCode for WSA_ERROR {
fn to_dword(&self) -> u32 {
return 0u32
.checked_add_signed(self.0)
.unwrap_or_else(|| panic!("Cast i32 -> u32 overflow"));
return i32::to_dword(&self.0)
}
}
@ -27,6 +25,19 @@ impl ErrCode for u32 {
return *self;
}
}
impl ErrCode for i32 {
fn to_dword(&self) -> u32 {
return 0u32
.checked_add_signed(*self)
.unwrap_or_else(|| panic!("Cast i32 -> u32 overflow"));
}
}
impl ErrCode for HRESULT {
fn to_dword(&self) -> u32 {
return i32::to_dword(&self.0)
}
}
pub(super) fn throw_from_windows_err_code(code: impl ErrCode) -> ! {
panic!(

View File

@ -1,68 +1,48 @@
use crate::io::allocator::Allocator;
use crate::io::servers_context::ServersContext;
use crate::io::windows::task::IoTask;
use crate::io::windows::WindowsAddressKnown;
use crate::io::windows::errors::throw_from_windows_err_code;
use crate::io::windows::task::IoTask;
use std::mem::uninitialized;
use std::pin::Pin;
use std::ptr::NonNull;
use windows::Win32::Networking::WinSock::{
WSARecvFrom, WSASendTo, WSASocketW, AF_UNSPEC, IPPROTO_ICMP, SOCKADDR,
SOCKADDR_STORAGE, SOCK_RAW, WSA_FLAG_NO_HANDLE_INHERIT, WSA_FLAG_OVERLAPPED,
AF_UNSPEC, IPPROTO_ICMP, LPWSAOVERLAPPED_COMPLETION_ROUTINE, SOCK_RAW, SOCKADDR,
SOCKADDR_STORAGE, SOCKET, SOCKET_ERROR, WSA_FLAG_NO_HANDLE_INHERIT,
WSA_FLAG_OVERLAPPED, WSA_IO_PENDING, WSA_OPERATION_ABORTED, WSAEACCES, WSAEADDRNOTAVAIL,
WSAEAFNOSUPPORT, WSAECONNRESET, WSAEDESTADDRREQ, WSAEFAULT, WSAEHOSTUNREACH, WSAEINPROGRESS,
WSAEINTR, WSAEINVAL, WSAEMSGSIZE, WSAENETDOWN, WSAENETRESET, WSAENETUNREACH, WSAENOBUFS,
WSAENOTCONN, WSAENOTSOCK, WSAESHUTDOWN, WSAEWOULDBLOCK, WSAGetLastError, WSANOTINITIALISED,
WSARecvFrom, WSASendTo, WSASocketW,
};
use windows::Win32::System::IO::OVERLAPPED;
pub unsafe fn run_eventloop<Ctx: ServersContext>(ctx: *mut Ctx) {
let mut heap = Allocator::<IoTask<CallbackData<_>>>::new();
let socket = WSASocketW(
AF_UNSPEC.0 as i32,
SOCK_RAW.0,
IPPROTO_ICMP.0,
None,
0,
WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT,
)
.unwrap();
let mut socket = WindowsOverlappingIcmpSocket::new();
let mut recv_task = NonNull::from_mut(&mut heap).as_mut().alloc();
let mut in_addr_size = 0i32;
let mut in_flags = 0u32;
CallbackData::init_recv(
&mut recv_task,
NonNull::new_unchecked(ctx),
NonNull::from_mut(&mut heap),
);
socket.receive(&mut recv_task, Some(receiving_done::<Ctx>));
loop {
for mut address in (*ctx).iter_servers() {
let mut data = NonNull::from_mut(&mut heap).as_mut().alloc();
data.init(CallbackData {
ctx: NonNull::new_unchecked(ctx),
allocator: NonNull::from_mut(&mut heap),
let (id, it) = (*ctx).start_measuring();
for address in it {
let mut task = NonNull::from_mut(&mut heap).as_mut().alloc();
CallbackData::init_send(
&mut task,
NonNull::new_unchecked(ctx),
NonNull::from_mut(&mut heap),
address,
});
let overlapped = data.get_overlapped();
let address =
NonNull::from_ref(&data.get_extra().address.any.native).cast::<SOCKADDR>();
WSASendTo(
socket,
data.as_ref().get_buffer_descriptors(),
None,
0,
Some(address.as_ptr()),
size_of::<SOCKADDR_STORAGE>() as i32,
Some(overlapped),
Some(sending_done::<Ctx>),
);
let overlapped = recv_task.get_overlapped();
let address =
NonNull::from_mut(&mut recv_task.get_extra().address.any.native).cast::<SOCKADDR>();
WSARecvFrom(
socket,
data.as_ref().get_buffer_descriptors(),
None,
&mut in_flags,
Some(address.as_ptr()),
Some(&mut in_addr_size),
Some(overlapped),
Some(receiving_done::<Ctx>),
);
socket.send(&mut task, Some(sending_done::<Ctx>));
}
}
}
@ -71,6 +51,38 @@ struct CallbackData<Ctx: ServersContext> {
ctx: NonNull<Ctx>,
allocator: NonNull<Allocator<IoTask<Self>>>,
address: WindowsAddressKnown,
__flags: u32,
__addrs_size: i32,
}
impl<Ctx: ServersContext> CallbackData<Ctx> {
fn init_send(
task: &mut Pin<&mut IoTask<CallbackData<Ctx>>>,
ctx: NonNull<Ctx>,
allocator: NonNull<Allocator<IoTask<Self>>>,
address: WindowsAddressKnown,
) {
task.init(Self {
ctx,
allocator,
address,
__flags: unsafe { uninitialized() },
__addrs_size: unsafe { uninitialized() },
});
}
fn init_recv(
task: &mut Pin<&mut IoTask<CallbackData<Ctx>>>,
ctx: NonNull<Ctx>,
allocator: NonNull<Allocator<IoTask<Self>>>,
) {
task.init(Self {
ctx,
allocator,
address: unsafe { uninitialized() },
__flags: unsafe { uninitialized() },
__addrs_size: unsafe { uninitialized() },
});
}
}
unsafe extern "system" fn sending_done<Ctx: ServersContext>(
@ -102,3 +114,122 @@ unsafe extern "system" fn receiving_done<Ctx: ServersContext>(
);
task.get_extra().allocator.as_mut().free(task);
}
struct WindowsOverlappingIcmpSocket {
s: SOCKET,
}
impl WindowsOverlappingIcmpSocket {
fn new() -> Self {
let result = unsafe {
WSASocketW(
AF_UNSPEC.0 as i32,
SOCK_RAW.0,
IPPROTO_ICMP.0,
None,
0,
WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT,
)
};
match result {
Ok(s) => return Self { s },
Err(e) => throw_from_windows_err_code(e.code()),
}
}
unsafe fn send<Ctx: ServersContext>(
&mut self,
task: &mut Pin<&mut IoTask<CallbackData<Ctx>>>,
cb: LPWSAOVERLAPPED_COMPLETION_ROUTINE,
) {
let overlapped = task.get_overlapped();
let address = NonNull::from_ref(&task.get_extra().address.any.native)
.cast::<SOCKADDR>()
.as_ptr();
let result = WSASendTo(
self.s,
task.as_ref().get_buffer_descriptors(),
None,
0,
Some(address),
size_of::<SOCKADDR_STORAGE>() as i32,
Some(overlapped),
cb,
);
if result != SOCKET_ERROR {
return;
}
let result = WSAGetLastError();
match result {
WSA_IO_PENDING => {}
WSAENETUNREACH | WSAENETRESET | WSAECONNRESET => todo!("report"),
WSAEWOULDBLOCK => throw_from_windows_err_code(result),
WSAEACCES
| WSAEADDRNOTAVAIL
| WSAEAFNOSUPPORT
| WSAEDESTADDRREQ
| WSAEFAULT
| WSAEHOSTUNREACH
| WSAEINPROGRESS
| WSAEINTR
| WSAEINVAL
| WSAEMSGSIZE
| WSAENETDOWN
| WSAENOBUFS
| WSAENOTCONN
| WSAENOTSOCK
| WSAESHUTDOWN
| WSANOTINITIALISED
| WSA_OPERATION_ABORTED => throw_from_windows_err_code(result),
_ => throw_from_windows_err_code(result),
}
}
unsafe fn receive<Ctx: ServersContext>(
&mut self,
task: &mut Pin<&mut IoTask<CallbackData<Ctx>>>,
cb: LPWSAOVERLAPPED_COMPLETION_ROUTINE,
) {
let overlapped = task.get_overlapped();
let extra = NonNull::from_ref(task.get_extra()).as_mut();
extra.__addrs_size = size_of::<SOCKADDR_STORAGE>() as i32;
extra.__flags = 0;
let result = WSARecvFrom(
self.s,
task.as_ref().get_buffer_descriptors(),
None,
&mut extra.__flags,
Some(
NonNull::from_mut(&mut extra.address.any.native)
.cast::<SOCKADDR>()
.as_ptr(),
),
Some(&mut extra.__addrs_size),
Some(overlapped),
cb,
);
if result != SOCKET_ERROR {
return;
}
let result = WSAGetLastError();
match result {
WSAECONNRESET | WSAEMSGSIZE | WSAENETRESET | WSA_IO_PENDING => return,
WSAEINVAL => throw_from_windows_err_code(result),
WSAEWOULDBLOCK => throw_from_windows_err_code(result),
WSAEFAULT
| WSAEINPROGRESS
| WSAEINTR
| WSAENETDOWN
| WSAENOTCONN
| WSANOTINITIALISED
| WSA_OPERATION_ABORTED => throw_from_windows_err_code(result),
_ => throw_from_windows_err_code(result),
}
}
}

View File

@ -18,12 +18,14 @@ pub struct IoTask<E> {
}
impl<E> IoTask<E> {
pub unsafe fn init(self: &mut Pin<&mut Self>, extra: E) {
let self_unwrapped = Pin::into_inner_unchecked(NonNull::from_mut(self).read());
self_unwrapped.sys = zeroed();
self_unwrapped.buff_d[0].len = self_unwrapped.buff.len() as u32;
self_unwrapped.buff_d[0].buf = PSTR(self_unwrapped.buff.as_mut_ptr());
self_unwrapped.extra = extra
pub fn init(self: &mut Pin<&mut Self>, extra: E) {
unsafe {
let self_unwrapped = Pin::into_inner_unchecked(NonNull::from_mut(self).read());
self_unwrapped.sys = zeroed();
self_unwrapped.buff_d[0].len = self_unwrapped.buff.len() as u32;
self_unwrapped.buff_d[0].buf = PSTR(self_unwrapped.buff.as_mut_ptr());
self_unwrapped.extra = extra
}
}
pub const BUFFER_LEN: usize = _BUFF_LEN;