From e5f8a08238f7a7ad9fb8894ab82bb1627c3fda09 Mon Sep 17 00:00:00 2001 From: Andrew Golovashevich Date: Tue, 4 Nov 2025 21:26:51 +0300 Subject: [PATCH] Socket state improvements --- modules/sockets/CMakeLists.txt | 1 - modules/sockets/src/berkeley/datagram.cppm | 74 ++++++++--- modules/sockets/src/berkeley/stream.cppm | 129 +++++++++++-------- modules/sockets/src/platform/windows.cpp.inc | 4 - modules/sockets/src/windows_binds.cppm | 13 -- modules/threads/src/thread_routine.cppm | 2 +- 6 files changed, 135 insertions(+), 88 deletions(-) delete mode 100644 modules/sockets/src/windows_binds.cppm diff --git a/modules/sockets/CMakeLists.txt b/modules/sockets/CMakeLists.txt index 289ad4c..9ff391c 100644 --- a/modules/sockets/CMakeLists.txt +++ b/modules/sockets/CMakeLists.txt @@ -6,7 +6,6 @@ target_sources( PUBLIC FILE_SET cxx_modules TYPE CXX_MODULES FILES src/common.cppm - src/windows_binds.cppm src/berkeley/context.cppm src/berkeley/internals.cppm diff --git a/modules/sockets/src/berkeley/datagram.cppm b/modules/sockets/src/berkeley/datagram.cppm index 9430cb6..994dc39 100644 --- a/modules/sockets/src/berkeley/datagram.cppm +++ b/modules/sockets/src/berkeley/datagram.cppm @@ -9,7 +9,7 @@ import :berkeley.internals; import :berkeley.address; namespace LdH::Sockets::Berkeley { -template requires BerkeleySocketsContext + template requires BerkeleySocketsContext class _abstract_datagram_socket { private: using _refcnt_t = signed long long; @@ -20,41 +20,76 @@ template requires BerkeleySocketsContext ctx_t::socket_t _value; private: - static constexpr _refcnt_t _refcnt_UNINITIALIZED = -0x0800'0000'0000'0000ll; - static constexpr _refcnt_t _refcnt_MOVED = -0x0800'0000'0000'0001ll; - static constexpr _refcnt_t _refcnt_CLOSED = -0x07FFF'FFFF'FFFF'FFFFll; + static constexpr _refcnt_t _refcnt_UNINITIALIZED = -0x4000'0000'0000'0000ll + 0; + static constexpr _refcnt_t _refcnt_INITIALIZING = -0x4000'0000'0000'0000ll + 1; + static constexpr _refcnt_t _refcnt_MOVING = -0x4000'0000'0000'0000ll + 2; + static constexpr _refcnt_t _refcnt_MOVED = -0x4000'0000'0000'0000ll + 3; + static constexpr _refcnt_t _refcnt_CLOSING = -0x4000'0000'0000'0000ll + 4; + static constexpr _refcnt_t _refcnt_CLOSED = -0x4000'0000'0000'0000ll + 5; protected: - explicit _abstract_datagram_socket(ctx_t::socket_t &&value) : _refcnt{0}, _value{std::move(value)} { + explicit _abstract_datagram_socket(ctx_t::socket_t &&value) : _refcnt{_refcnt_INITIALIZING}, _value{std::move(value)} { + this->_refcnt.store(0); } public: _abstract_datagram_socket() : _refcnt{_refcnt_UNINITIALIZED}, _value{} { }; - _abstract_datagram_socket(_abstract_datagram_socket &&other) noexcept : _refcnt{other._refcnt.load()}, _value{std::move(other._value)} { + _abstract_datagram_socket(_abstract_datagram_socket &&other) noexcept : _abstract_datagram_socket{} { + this->operator=(std::move(other)); }; _abstract_datagram_socket &operator=(_abstract_datagram_socket &&other) noexcept { - _refcnt_t current = other->_refcnt.load(); + _refcnt_t other_state = other->_refcnt.load(); while (true) { - if (current > 0) - LdH::abort("Can't move socket while it is in use"); - if (this->_refcnt.compare_exchange_weak(current, _refcnt_MOVED)) + switch (other_state) { + case _refcnt_UNINITIALIZED: + case _refcnt_MOVED: + case _refcnt_CLOSED: + case 0: + break; + case _refcnt_INITIALIZING: + case _refcnt_MOVING: + case _refcnt_CLOSING: + LdH::abort("Can't move socket while it is in use"); + default: + if (other_state > 0) + LdH::abort("Can't move socket while it is in use"); + else if (other_state < 0) + LdH::abort("Source socket wrapper corrupted"); + } + if (this->_refcnt.compare_exchange_weak(other_state, _refcnt_MOVING)) break; } - current = this->_refcnt.load(); + _refcnt_t this_state = this->_refcnt.load(); while (true) { - if (current >= 0) { - other._refcnt.store(0); // rollback - LdH::abort("Variable already initialized"); + switch (this_state) { + case _refcnt_UNINITIALIZED: + case _refcnt_MOVED: + case _refcnt_CLOSED: + case 0: + break; + case _refcnt_INITIALIZING: + case _refcnt_MOVING: + case _refcnt_CLOSING: + goto ALREADY_INITIALIZED; + default: + if (this_state > 0) { + ALREADY_INITIALIZED: + other._refcnt.store(other_state); // rollback + LdH::abort("Variable already initialized"); + } else if (this_state < 0) + LdH::abort("Destination socket wrapper corrupted"); } - if (this->_refcnt.compare_exchange_weak(current, 0)) + if (this->_refcnt.compare_exchange_weak(this_state, 0)) break; } this->_value = std::move(other._value); + this->_refcnt.store(0); + other._refcnt.store(_refcnt_MOVED); return *this; } @@ -66,9 +101,12 @@ template requires BerkeleySocketsContext if (current < 0) { switch (current) { case _refcnt_UNINITIALIZED: + case _refcnt_INITIALIZING: LdH::abort("Socket not initialized"); case _refcnt_MOVED: + case _refcnt_MOVING: LdH::abort("Socket was moved to another location"); + case _refcnt_CLOSING: case _refcnt_CLOSED: LdH::abort("Socket was closed"); default: @@ -86,10 +124,13 @@ template requires BerkeleySocketsContext if (current < 0) { switch (current) { case _refcnt_UNINITIALIZED: + case _refcnt_INITIALIZING: LdH::abort("Socket not initialized"); case _refcnt_MOVED: + case _refcnt_MOVING: LdH::abort("Socket was moved to another location"); case _refcnt_CLOSED: + case _refcnt_CLOSING: LdH::abort("Socket was closed"); default: LdH::abort("Socket wrapper corrupted"); @@ -107,10 +148,13 @@ template requires BerkeleySocketsContext if (current != 0) { switch (current) { case _refcnt_UNINITIALIZED: + case _refcnt_INITIALIZING: LdH::abort("Socket not initialized"); case _refcnt_MOVED: + case _refcnt_MOVING: LdH::abort("Socket was moved to another location"); case _refcnt_CLOSED: + case _refcnt_CLOSING: LdH::abort("Socket was already closed"); default: if (current > 0) diff --git a/modules/sockets/src/berkeley/stream.cppm b/modules/sockets/src/berkeley/stream.cppm index 9707808..d1847aa 100644 --- a/modules/sockets/src/berkeley/stream.cppm +++ b/modules/sockets/src/berkeley/stream.cppm @@ -9,66 +9,93 @@ import :berkeley.internals; import :berkeley.address; namespace LdH::Sockets::Berkeley { - enum _socket_state_t { - _socket_state_UNINITIALIZED, - _socket_state_MOVED, - _socket_state_AVAILABLE, - _socket_state_READING, - _socket_state_WRITING, - _socket_state_BOTH, - _socket_state_CLOSED, - }; - template requires BerkeleySocketsContext - class _berkeley_socket_commons { + class _stream_socket_commons { private: - std::atomic<_socket_state_t> _state; + enum _state_t { + _socket_state_UNINITIALIZED, + _socket_state_INITIALIZING, + _socket_state_MOVING, + _socket_state_MOVED, + _socket_state_AVAILABLE, + _socket_state_READING, + _socket_state_WRITING, + _socket_state_BOTH, + _socket_state_CLOSING, + _socket_state_CLOSED, + }; + + + std::atomic<_state_t> _state; protected: ctx_t::socket_t _value; - explicit _berkeley_socket_commons(ctx_t::socket_t &&value) : _state{_socket_state_AVAILABLE}, _value{std::move(value)} { + explicit _stream_socket_commons(ctx_t::socket_t &&value) : _state{_socket_state_INITIALIZING}, _value{std::move(value)} { + this->_state.store(_socket_state_AVAILABLE); } public: - _berkeley_socket_commons() : _state{_socket_state_UNINITIALIZED}, _value{} { + _stream_socket_commons() : _state{_socket_state_UNINITIALIZED}, _value{} { }; - explicit _berkeley_socket_commons(_berkeley_socket_commons &&other) noexcept : _state{other._state.load()}, _value{std::move(other._value)} { - other._state.store(_socket_state_MOVED); + explicit _stream_socket_commons(_stream_socket_commons &&other) noexcept : _stream_socket_commons{} { + this->operator=(std::move(other)); } - _berkeley_socket_commons &operator=(_berkeley_socket_commons &&other) noexcept { - _socket_state_t current = this->_state.load(); + _stream_socket_commons &operator=(_stream_socket_commons &&other) noexcept { + _state_t other_state = other._state.load(); while (true) { - switch (current) { + switch (other_state) { + case _socket_state_READING: + case _socket_state_WRITING: + case _socket_state_BOTH: + case _socket_state_INITIALIZING: + case _socket_state_MOVING: + case _socket_state_CLOSING: + LdH::abort("Can't move socket while it is in use"); + default: + if (this->_state.compare_exchange_weak(other_state, _socket_state_MOVING)) + goto CHECK_THIS_STATE; + } + } + + CHECK_THIS_STATE: + _state_t this_state = this->_state.load(); + while (true) { + switch (this_state) { case _socket_state_UNINITIALIZED: case _socket_state_MOVED: case _socket_state_CLOSED: - if (this->_state.compare_exchange_weak(current, _socket_state_AVAILABLE)) - goto DONE; + case _socket_state_INITIALIZING: + case _socket_state_MOVING: + case _socket_state_CLOSING: + if (this->_state.compare_exchange_weak(this_state, _socket_state_INITIALIZING)) + goto DO_MOVE; continue; default: + other._state.store(other_state); // rollback LdH::abort("Variable already initialized"); } } - DONE: - this->_state.store(other._state.exchange(_socket_state_MOVED)); + DO_MOVE: this->_value = std::move(other._value); + this->_state.store(_socket_state_AVAILABLE); + other._state.store(_socket_state_MOVED); return *this; } private: - _socket_state_t _cas_state(_socket_state_t expected, _socket_state_t next) { + _state_t _cas_state(_state_t expected, _state_t next) { this->_state.compare_exchange_strong(expected, next); return expected; } - template<_socket_state_t exp1, _socket_state_t next1, _socket_state_t exp2, _socket_state_t next2> - _socket_state_t _cas_state_2() { - _socket_state_t current = this->_state.load(); + template<_state_t exp1, _state_t next1, _state_t exp2, _state_t next2> + _state_t _cas_state_2() { + _state_t current = this->_state.load(); while (true) { switch (current) { case exp1: @@ -89,7 +116,9 @@ namespace LdH::Sockets::Berkeley { void _start_reading(char const *busy_msg) { switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_READING, _socket_state_WRITING, _socket_state_BOTH>()) { case _socket_state_UNINITIALIZED: + case _socket_state_INITIALIZING: LdH::abort("Socket not initialized"); + case _socket_state_MOVING: case _socket_state_MOVED: LdH::abort("Socket was moved to another location"); case _socket_state_AVAILABLE: @@ -99,6 +128,7 @@ namespace LdH::Sockets::Berkeley { case _socket_state_BOTH: LdH::abort(busy_msg); case _socket_state_CLOSED: + case _socket_state_CLOSING: LdH::abort("Socket was closed"); } } @@ -110,8 +140,10 @@ namespace LdH::Sockets::Berkeley { void _start_writing(char const *busy_msg) { switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_WRITING, _socket_state_READING, _socket_state_BOTH>()) { case _socket_state_UNINITIALIZED: + case _socket_state_INITIALIZING: LdH::abort("Socket not initialized"); case _socket_state_MOVED: + case _socket_state_MOVING: LdH::abort("Socket was moved to another location"); case _socket_state_AVAILABLE: case _socket_state_READING: @@ -120,6 +152,7 @@ namespace LdH::Sockets::Berkeley { case _socket_state_BOTH: LdH::abort(busy_msg); case _socket_state_CLOSED: + case _socket_state_CLOSING: LdH::abort("Socket was closed"); } } @@ -129,10 +162,12 @@ namespace LdH::Sockets::Berkeley { } void _close(char const *busy_msg) { - switch (this->_cas_state(_socket_state_AVAILABLE, _socket_state_CLOSED)) { + switch (this->_cas_state(_socket_state_AVAILABLE, _socket_state_CLOSING)) { case _socket_state_UNINITIALIZED: + case _socket_state_INITIALIZING: LdH::abort("Socket not initialized"); case _socket_state_MOVED: + case _socket_state_MOVING: LdH::abort("Socket was moved to another location"); case _socket_state_AVAILABLE: break; @@ -140,11 +175,13 @@ namespace LdH::Sockets::Berkeley { case _socket_state_WRITING: case _socket_state_BOTH: LdH::abort(busy_msg); + case _socket_state_CLOSING: case _socket_state_CLOSED: LdH::abort("Socket already was closed"); } this->_value.close(); + this->_state.store(_socket_state_CLOSED); } }; @@ -155,17 +192,11 @@ namespace LdH::Sockets::Berkeley { export template requires BerkeleySocketsContext - class StreamSocket : public _berkeley_socket_commons, public virtual LdH::Streams::InputStream, public virtual LdH::Streams::OutputStream { + class StreamSocket : public _stream_socket_commons, public virtual LdH::Streams::InputStream, public virtual LdH::Streams::OutputStream { private: - explicit StreamSocket(ctx_t::socket_t &&value) : _berkeley_socket_commons{std::move(value)} { + explicit StreamSocket(ctx_t::socket_t &&value) : _stream_socket_commons{std::move(value)} { } - - template requires BerkeleySocketsContext - friend - class StreamSocketsServer; - - template friend class _socket_internals; @@ -205,22 +236,10 @@ namespace LdH::Sockets::Berkeley { } }; - - export - template - class SocketWithAddress { - public: - Address addr; - sock_t sock; - - SocketWithAddress(Address addr, sock_t &&sock) : addr{addr}, sock{std::move(sock)} { - } - }; - template requires BerkeleySocketsContext - class StreamSocketsServer : public _berkeley_socket_commons { + class StreamSocketsServer : public _stream_socket_commons { private: - explicit StreamSocketsServer(ctx_t::socket_t &&value) : _berkeley_socket_commons{std::move(value)} { + explicit StreamSocketsServer(ctx_t::socket_t &&value) : _stream_socket_commons{std::move(value)} { } template @@ -235,13 +254,15 @@ namespace LdH::Sockets::Berkeley { StreamSocketsServer &operator=(StreamSocketsServer &&other) noexcept = default; [[nodiscard]] - SocketWithAddress > wait_for_connection() { + StreamSocket wait_for_connection(Address *addr) { this->_start_reading("Socket already waiting for connections"); - typename ctx_t::address_t addr{}; - typename ctx_t::socket_t raw = this->_value.accept(&addr); + typename ctx_t::address_t addr_raw{}; + typename ctx_t::socket_t raw = this->_value.accept(&addr_raw); this->_finish_reading(); - return SocketWithAddress >{_addr_internals::wrap(addr), _socket_internals >::wrap(std::move(raw))}; + + *addr = std::move(_addr_internals::wrap(addr_raw)); + return _socket_internals >::wrap(std::move(raw)); } void close() { diff --git a/modules/sockets/src/platform/windows.cpp.inc b/modules/sockets/src/platform/windows.cpp.inc index b844ce4..d27c01c 100644 --- a/modules/sockets/src/platform/windows.cpp.inc +++ b/modules/sockets/src/platform/windows.cpp.inc @@ -197,10 +197,6 @@ namespace LdH::Sockets { export using Address = Berkeley::Address<_WinsockContext>; - export - template - using SocketWithAddress = Berkeley::SocketWithAddress<_WinsockContext, sock_t>; - export using StreamSocket = Berkeley::StreamSocket<_WinsockContext>; export using StreamSocketsServer = Berkeley::StreamSocketsServer<_WinsockContext>; diff --git a/modules/sockets/src/windows_binds.cppm b/modules/sockets/src/windows_binds.cppm deleted file mode 100644 index 9a6147f..0000000 --- a/modules/sockets/src/windows_binds.cppm +++ /dev/null @@ -1,13 +0,0 @@ -module; - -#include -#include -#include - -export module ru.landgrafhomyak.BGTU.networks_1.sockets:binds; - -namespace LdH::Sockets::_Binds { - export using ::sockaddr_in; - export using ::sockaddr_in6; - export using ::SOCKET; -} diff --git a/modules/threads/src/thread_routine.cppm b/modules/threads/src/thread_routine.cppm index f6dfc9d..ccefcdc 100644 --- a/modules/threads/src/thread_routine.cppm +++ b/modules/threads/src/thread_routine.cppm @@ -4,5 +4,5 @@ import std; namespace LdH { export template - concept ThreadRoutine = std::invocable && std::same_as, void> /*&& std::movable*/ && std::destructible; + concept ThreadRoutine = std::invocable && std::same_as, void> && std::move_constructible && std::destructible; }