Lambda oriented async socket

This commit is contained in:
Andrew Golovashevich 2026-03-09 01:25:19 +03:00
parent 744ecde879
commit 7f8187a43e
5 changed files with 182 additions and 159 deletions

View File

@ -12,4 +12,4 @@ bgtu-networks-2-network-abstract = { path = "../abstract" }
[dependencies.windows] [dependencies.windows]
version = ">=0.41.0, <=0.62.2" version = ">=0.41.0, <=0.62.2"
registry = "crates-io" registry = "crates-io"
features = ["Win32_System_IO", "Win32_Networking", "Win32_Networking_WinSock", "Win32_System_Diagnostics", "Win32_System_Diagnostics_Debug", "Win32_System_Threading", "Win32_System_Memory"] features = ["Win32_System_IO", "Win32_Networking", "Win32_Networking_WinSock", "Win32_System_Diagnostics", "Win32_System_Diagnostics_Debug", "Win32_System_Threading", "Win32_System_Memory", "Win32_System_SystemInformation"]

View File

@ -84,7 +84,7 @@ impl Allocator {
drop(obj.cast::<T>().read()) drop(obj.cast::<T>().read())
} }
pub unsafe fn free<T: Drop>(&self, p: Pin<&mut T>) { pub unsafe fn free<T>(&self, p: Pin<&mut T>) {
let p = NonNull::from_mut(Pin::into_inner_unchecked(p)); let p = NonNull::from_mut(Pin::into_inner_unchecked(p));
let h = NonNull::new_unchecked( let h = NonNull::new_unchecked(
p.as_ptr() p.as_ptr()

View File

@ -1,124 +1,100 @@
mod _allocator_tests;
mod allocator;
mod socket; mod socket;
mod task; mod task;
mod allocator;
mod _allocator_tests;
use socket::WindowsOverlappingIcmpSocket; use socket::WindowsOverlappingIcmpSocket;
use std::ffi::c_void;
use crate::address::WindowsAddressKnown; use crate::address::WindowsAddressKnown;
use crate::errors::format_windows_err_code;
use allocator::Allocator; use allocator::Allocator;
use task::IoTask;
use bgtu_networks_2_network_abstract::ServersContext; use bgtu_networks_2_network_abstract::ServersContext;
use std::mem::uninitialized;
use std::pin::Pin;
use std::ptr::NonNull; use std::ptr::NonNull;
use windows::Win32::System::IO::OVERLAPPED; use windows::Win32::System::SystemInformation::GetTickCount;
use windows::Win32::System::Threading::SleepEx; use windows::Win32::System::Threading::SleepEx;
pub unsafe fn run_eventloop<Ctx: ServersContext<WindowsAddressKnown>>(ctx: *mut Ctx) { pub unsafe fn run_eventloop<Ctx: ServersContext<WindowsAddressKnown>>(ctx: *mut Ctx) {
let mut heap = Allocator::new(); let heap = Allocator::new();
let mut socket = WindowsOverlappingIcmpSocket::new();
let mut recv_task = NonNull::from_mut(&mut heap).as_mut().alloc(|mut p| CallbackData::init_recv(
&mut p,
NonNull::new_unchecked(ctx),
NonNull::from_mut(&mut heap),
));
let mut socket = WindowsOverlappingIcmpSocket::new(&heap);
let mut is_receive_enabled = false; let mut is_receive_enabled = false;
let recv_callback = RecvCallback {
sock: NonNull::from_mut(&mut socket),
ctx,
};
loop { loop {
let (id, it) = (*ctx).start_measuring(); let (id, it) = (*ctx).start_measuring();
let mut should_recv_being_enabled = false; let mut should_recv_being_enabled = false;
for address in it { for address in it {
let mut task = NonNull::from_mut(&mut heap).as_mut().alloc(|mut p| CallbackData::init_send( socket.send(
&mut p,
NonNull::new_unchecked(ctx),
NonNull::from_mut(&mut heap),
address, address,
)); |buff| {},
|err, _| println!("Otpravlena hujnya: {}", format_windows_err_code(err)),
);
socket.send(&mut task, Some(sending_done::<Ctx>));
should_recv_being_enabled = true; should_recv_being_enabled = true;
} }
if (!is_receive_enabled && should_recv_being_enabled) { if (!is_receive_enabled && should_recv_being_enabled) {
socket.receive(&mut recv_task, Some(receiving_done::<Ctx>)); recv_callback.bind_callback();
is_receive_enabled = true; is_receive_enabled = true;
} }
SleepEx(1000, true); let delayStart = GetTickCount();
let delayEnd = delayStart.wrapping_add(1000);
loop {
let current = GetTickCount();
if delayStart < delayEnd {
if current < delayEnd {
SleepEx(delayEnd - current, true);
} else {
break;
}
} else {
if current < delayEnd || current > delayEnd.wrapping_sub(1000) {
SleepEx(delayEnd.wrapping_sub(current), true);
} else {
break;
}
}
}
} }
} }
struct CallbackData<Ctx: ServersContext<WindowsAddressKnown>> { struct RecvCallback<'s, Ctx: ServersContext<WindowsAddressKnown>> {
ctx: NonNull<Ctx>, sock: NonNull<WindowsOverlappingIcmpSocket<'s>>,
allocator: NonNull<Allocator>, ctx: *mut Ctx,
address: WindowsAddressKnown,
__flags: u32,
__addrs_size: i32,
} }
impl<Ctx: ServersContext<WindowsAddressKnown>> CallbackData<Ctx> { impl<'s, Ctx: ServersContext<WindowsAddressKnown>> RecvCallback<'_, Ctx> {
fn init_send( fn _callback_body(
task: &mut Pin<&mut IoTask<CallbackData<Ctx>>>, this_ptr: NonNull<c_void>,
ctx: NonNull<Ctx>, errcode: u32,
allocator: NonNull<Allocator>, addr: WindowsAddressKnown,
address: WindowsAddressKnown, buffer: &[u8],
) { ) {
task.init(Self { unsafe {
ctx, let this = this_ptr.cast::<Self>().as_ref();
allocator, this.bind_callback();
address,
__flags: unsafe { uninitialized() },
__addrs_size: unsafe { uninitialized() },
});
} }
fn init_recv( }
task: &mut Pin<&mut IoTask<CallbackData<Ctx>>>,
ctx: NonNull<Ctx>, // I <3 borrow checker
allocator: NonNull<Allocator>, unsafe fn _bind_callback(
this: NonNull<c_void>,
sock: NonNull<WindowsOverlappingIcmpSocket<'s>>,
f: fn(NonNull<c_void>, u32, WindowsAddressKnown, &[u8]),
) { ) {
task.init(Self { sock.as_ref().receive(move |err, addr, buffer| {
ctx, (f)(this, err, addr, buffer);
allocator, })
address: unsafe { uninitialized() }, }
__flags: unsafe { uninitialized() },
__addrs_size: unsafe { uninitialized() }, unsafe fn bind_callback(&self) {
}); let self_ptr = NonNull::from_ref(self);
Self::_bind_callback(self_ptr.cast(), self.sock, Self::_callback_body);
} }
} }
unsafe extern "system" fn sending_done<Ctx: ServersContext<WindowsAddressKnown>>(
dwerror: u32,
cbtransferred: u32,
lpoverlapped: *mut OVERLAPPED,
dwflags: u32,
) {
let mut task = Pin::new_unchecked(
lpoverlapped
.cast::<IoTask<CallbackData<Ctx>>>()
.as_mut()
.unwrap(),
);
task.get_extra().allocator.as_mut().free(task);
}
unsafe extern "system" fn receiving_done<Ctx: ServersContext<WindowsAddressKnown>>(
dwerror: u32,
cbtransferred: u32,
lpoverlapped: *mut OVERLAPPED,
dwflags: u32,
) {
let mut task = Pin::new_unchecked(
lpoverlapped
.cast::<IoTask<CallbackData<Ctx>>>()
.as_mut()
.unwrap(),
);
task.get_extra().allocator.as_mut().free(task);
}

View File

@ -1,25 +1,30 @@
use crate::address::WindowsAddressKnown; use crate::address::WindowsAddressKnown;
use crate::errors::throw_from_windows_err_code; use crate::errors::throw_from_windows_err_code;
use crate::eventloop::CallbackData; use crate::eventloop::allocator::Allocator;
use crate::eventloop::task::IoTask; use crate::eventloop::task::IoTask;
use bgtu_networks_2_network_abstract::ServersContext; use bgtu_networks_2_network_abstract::ServersContext;
use std::mem::uninitialized;
use std::pin::Pin; use std::pin::Pin;
use std::ptr::NonNull; use std::ptr::NonNull;
use std::slice;
use windows::Win32::Networking::WinSock::{ use windows::Win32::Networking::WinSock::{
AF_UNSPEC, IPPROTO_ICMP, LPWSAOVERLAPPED_COMPLETION_ROUTINE, SOCK_RAW, SOCKADDR, WSAGetLastError, WSARecvFrom, WSASendTo, WSASocketW, AF_UNSPEC,
SOCKADDR_STORAGE, SOCKET, SOCKET_ERROR, WSA_FLAG_NO_HANDLE_INHERIT, WSA_FLAG_OVERLAPPED, IPPROTO_ICMP, LPWSAOVERLAPPED_COMPLETION_ROUTINE, SOCKADDR, SOCKADDR_STORAGE, SOCKET,
WSA_IO_PENDING, WSA_OPERATION_ABORTED, WSAEACCES, WSAEADDRNOTAVAIL, WSAEAFNOSUPPORT, SOCKET_ERROR, SOCK_RAW, WSAEACCES, WSAEADDRNOTAVAIL, WSAEAFNOSUPPORT,
WSAECONNRESET, WSAEDESTADDRREQ, WSAEFAULT, WSAEHOSTUNREACH, WSAEINPROGRESS, WSAEINTR, WSAECONNRESET, WSAEDESTADDRREQ, WSAEFAULT, WSAEHOSTUNREACH, WSAEINPROGRESS, WSAEINTR,
WSAEINVAL, WSAEMSGSIZE, WSAENETDOWN, WSAENETRESET, WSAENETUNREACH, WSAENOBUFS, WSAENOTCONN, WSAEINVAL, WSAEMSGSIZE, WSAENETDOWN, WSAENETRESET, WSAENETUNREACH, WSAENOBUFS, WSAENOTCONN,
WSAENOTSOCK, WSAESHUTDOWN, WSAEWOULDBLOCK, WSAGetLastError, WSANOTINITIALISED, WSARecvFrom, WSAENOTSOCK, WSAESHUTDOWN, WSAEWOULDBLOCK, WSANOTINITIALISED, WSA_FLAG_NO_HANDLE_INHERIT, WSA_FLAG_OVERLAPPED,
WSASendTo, WSASocketW, WSA_IO_PENDING, WSA_OPERATION_ABORTED,
}; };
use windows::Win32::System::IO::OVERLAPPED;
pub(super) struct WindowsOverlappingIcmpSocket { pub(super) struct WindowsOverlappingIcmpSocket<'a> {
s: SOCKET, socket: SOCKET,
heap: &'a Allocator,
} }
impl WindowsOverlappingIcmpSocket {
pub fn new() -> Self { impl<'a> WindowsOverlappingIcmpSocket<'a> {
pub fn new(heap: &'a Allocator) -> Self {
let result = unsafe { let result = unsafe {
WSASocketW( WSASocketW(
AF_UNSPEC.0 as i32, AF_UNSPEC.0 as i32,
@ -32,30 +37,71 @@ impl WindowsOverlappingIcmpSocket {
}; };
match result { match result {
Ok(s) => return Self { s }, Ok(s) => return Self { socket: s, heap },
Err(e) => throw_from_windows_err_code(e.code()), Err(e) => throw_from_windows_err_code(e.code()),
} }
} }
unsafe extern "system" fn _native_callback<
pub unsafe fn send<Ctx: ServersContext<WindowsAddressKnown>>( F: for<'xx> FnOnce(u32, &'xx [u8], WindowsAddressKnown) + 'static,
&mut self, >(
task: &mut Pin<&mut IoTask<CallbackData<Ctx>>>, dwerror: u32,
cb: LPWSAOVERLAPPED_COMPLETION_ROUTINE, cbtransferred: u32,
lpoverlapped: *mut OVERLAPPED,
dwflags: u32,
) { ) {
let overlapped = task.get_overlapped(); let task =
let address = NonNull::from_ref(&task.get_extra().address.any.native) NonNull::from_mut(IoTask::<F>::from_overlapped(lpoverlapped).get_unchecked_mut())
.cast::<SOCKADDR>()
.as_ptr(); .as_ptr();
(task.read().callback)(
dwerror,
slice::from_raw_parts(
NonNull::from_ref(&(*task).buff).as_ptr().cast(),
cbtransferred as usize,
),
task.read().addr,
);
NonNull::new_unchecked(task)
.as_mut()
.heap
.as_mut()
.free(Pin::new_unchecked(NonNull::new_unchecked(task).as_mut()));
}
fn _gen_native_callback<F: for<'xx> FnOnce(u32, &'xx [u8], WindowsAddressKnown) + 'static>(
_: &IoTask<F>,
) -> LPWSAOVERLAPPED_COMPLETION_ROUTINE {
return Some(Self::_native_callback::<F>);
}
pub unsafe fn send(
&mut self,
addr: WindowsAddressKnown,
init_data: impl FnOnce(&mut [u8]),
on_sent: impl FnOnce(u32, WindowsAddressKnown) + 'static,
) {
let task = self.heap.alloc(|p| {
IoTask::init(
p,
NonNull::from_ref(self.heap),
addr,
|errcode, _: &[u8], addr| on_sent(errcode, addr),
)
});
let task_p = task.get_unchecked_mut();
init_data(&mut task_p.buff);
let result = WSASendTo( let result = WSASendTo(
self.s, self.socket,
task.as_ref().get_buffer_descriptors(), &task_p.buff_d,
None, None,
0, 0,
Some(address), Some(NonNull::from_ref(&task_p.addr).cast().as_ptr()),
size_of::<SOCKADDR_STORAGE>() as i32, size_of::<SOCKADDR_STORAGE>() as i32,
Some(overlapped), Some(NonNull::from_mut(&mut task_p.sys).as_ptr()),
cb, Self::_gen_native_callback(task_p),
); );
if result != SOCKET_ERROR { if result != SOCKET_ERROR {
@ -88,28 +134,37 @@ impl WindowsOverlappingIcmpSocket {
} }
} }
pub unsafe fn receive<Ctx: ServersContext<WindowsAddressKnown>>( pub unsafe fn receive(
&mut self, &self,
task: &mut Pin<&mut IoTask<CallbackData<Ctx>>>, on_received: impl for<'xx> FnOnce(u32, WindowsAddressKnown, &'xx [u8]) + 'static,
cb: LPWSAOVERLAPPED_COMPLETION_ROUTINE,
) { ) {
let overlapped = task.get_overlapped();
let extra = NonNull::from_ref(task.get_extra()).as_mut(); let task = self.heap.alloc(|p| {
extra.__addrs_size = size_of::<SOCKADDR_STORAGE>() as i32; IoTask::init(
extra.__flags = 0; p,
NonNull::from_ref(self.heap),
uninitialized(),
|errcode, buffer: &[u8], addr| on_received(errcode, addr, buffer),
)
});
let task_p = task.get_unchecked_mut();
task_p._addr_size = size_of::<SOCKADDR_STORAGE>() as i32;
task_p._flags = 0;
let result = WSARecvFrom( let result = WSARecvFrom(
self.s, self.socket,
task.as_ref().get_buffer_descriptors(), &task_p.buff_d,
None, None,
&mut extra.__flags, &mut task_p._flags,
Some( Some(
NonNull::from_mut(&mut extra.address.any.native) NonNull::from_mut(&mut task_p.addr.any.native)
.cast::<SOCKADDR>() .cast::<SOCKADDR>()
.as_ptr(), .as_ptr(),
), ),
Some(&mut extra.__addrs_size), Some(&mut task_p._addr_size),
Some(overlapped), Some(&mut task_p.sys),
cb, Self::_gen_native_callback(task_p),
); );
if result != SOCKET_ERROR { if result != SOCKET_ERROR {

View File

@ -1,54 +1,46 @@
use crate::eventloop::allocator::Allocator;
use std::marker::PhantomPinned; use std::marker::PhantomPinned;
use std::mem::zeroed; use std::mem::{offset_of, zeroed};
use std::pin::Pin; use std::pin::Pin;
use std::ptr::NonNull; use std::ptr::NonNull;
use windows::Win32::Networking::WinSock::WSABUF; use windows::Win32::Networking::WinSock::WSABUF;
use windows::Win32::System::IO::OVERLAPPED; use windows::Win32::System::IO::OVERLAPPED;
use windows::core::PSTR; use windows::core::PSTR;
use crate::address::WindowsAddressKnown;
const _BUFF_LEN: usize = 100; const _BUFF_LEN: usize = 100;
#[repr(C)] #[repr(C)]
pub struct IoTask<E> { pub(super) struct IoTask<F: 'static> {
sys: OVERLAPPED, pub sys: OVERLAPPED,
__pinned: PhantomPinned, __pinned: PhantomPinned,
buff_d: [WSABUF; 1], pub buff_d: [WSABUF; 1],
buff: [u8; _BUFF_LEN], pub buff: [u8; _BUFF_LEN],
extra: E, pub addr: WindowsAddressKnown,
pub heap: NonNull<Allocator>,
pub callback: F,
pub _flags: u32,
pub _addr_size: i32,
} }
impl<E> IoTask<E> { impl<F: 'static> IoTask<F> {
pub fn init(self: &mut Pin<&mut Self>, extra: E) { pub unsafe fn init(self: &mut Pin<&mut Self>, heap: NonNull<Allocator>, addr: WindowsAddressKnown, callback: F) {
unsafe { unsafe {
let self_unwrapped = Pin::into_inner_unchecked(NonNull::from_mut(self).read()); let self_unwrapped = Pin::into_inner_unchecked(NonNull::from_mut(self).read());
self_unwrapped.sys = zeroed(); self_unwrapped.sys = zeroed();
self_unwrapped.buff_d[0].len = self_unwrapped.buff.len() as u32; self_unwrapped.buff_d[0].len = self_unwrapped.buff.len() as u32;
self_unwrapped.buff_d[0].buf = PSTR(self_unwrapped.buff.as_mut_ptr()); self_unwrapped.buff_d[0].buf = PSTR(self_unwrapped.buff.as_mut_ptr());
self_unwrapped.extra = extra self_unwrapped.addr = addr;
self_unwrapped.heap = heap;
self_unwrapped.callback = callback;
} }
} }
pub const BUFFER_LEN: usize = _BUFF_LEN; pub unsafe fn from_overlapped(raw: *mut OVERLAPPED) -> Pin<&'static mut Self> {
return unsafe {
pub fn get_buffer<'s>(self: &'s mut Pin<&mut Self>) -> &'s mut [u8; _BUFF_LEN] { Pin::new_unchecked(
return &mut unsafe { Pin::into_inner_unchecked(NonNull::from_mut(self).read()) }.buff; NonNull::new_unchecked(raw.map_addr(|a| a - offset_of!(Self, sys)).cast()).as_mut(),
} )
};
pub fn get_buffer_descriptors<'s>(self: &'s Pin<&Self>) -> &'s [WSABUF] {
return &unsafe { Pin::into_inner_unchecked(NonNull::from_ref(self).read()) }.buff_d;
}
pub fn get_extra<'s>(self: &'s mut Pin<&mut Self>) -> &'s mut E {
return &mut unsafe { Pin::into_inner_unchecked(NonNull::from_mut(self).read()) }.extra;
}
pub fn get_overlapped(self: &mut Pin<&mut Self>) -> *mut OVERLAPPED {
return &mut unsafe { Pin::into_inner_unchecked(NonNull::from_mut(self).read()) }.sys;
}
}
impl <E> Drop for IoTask<E> {
fn drop(&mut self) {
todo!()
} }
} }