Socket state improvements

This commit is contained in:
Andrew Golovashevich 2025-11-04 21:26:51 +03:00
parent 11405ebff5
commit e5f8a08238
6 changed files with 135 additions and 88 deletions

View File

@ -6,7 +6,6 @@ target_sources(
PUBLIC
FILE_SET cxx_modules TYPE CXX_MODULES FILES
src/common.cppm
src/windows_binds.cppm
src/berkeley/context.cppm
src/berkeley/internals.cppm

View File

@ -9,7 +9,7 @@ import :berkeley.internals;
import :berkeley.address;
namespace LdH::Sockets::Berkeley {
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _abstract_datagram_socket {
private:
using _refcnt_t = signed long long;
@ -20,41 +20,76 @@ template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
ctx_t::socket_t _value;
private:
static constexpr _refcnt_t _refcnt_UNINITIALIZED = -0x0800'0000'0000'0000ll;
static constexpr _refcnt_t _refcnt_MOVED = -0x0800'0000'0000'0001ll;
static constexpr _refcnt_t _refcnt_CLOSED = -0x07FFF'FFFF'FFFF'FFFFll;
static constexpr _refcnt_t _refcnt_UNINITIALIZED = -0x4000'0000'0000'0000ll + 0;
static constexpr _refcnt_t _refcnt_INITIALIZING = -0x4000'0000'0000'0000ll + 1;
static constexpr _refcnt_t _refcnt_MOVING = -0x4000'0000'0000'0000ll + 2;
static constexpr _refcnt_t _refcnt_MOVED = -0x4000'0000'0000'0000ll + 3;
static constexpr _refcnt_t _refcnt_CLOSING = -0x4000'0000'0000'0000ll + 4;
static constexpr _refcnt_t _refcnt_CLOSED = -0x4000'0000'0000'0000ll + 5;
protected:
explicit _abstract_datagram_socket(ctx_t::socket_t &&value) : _refcnt{0}, _value{std::move(value)} {
explicit _abstract_datagram_socket(ctx_t::socket_t &&value) : _refcnt{_refcnt_INITIALIZING}, _value{std::move(value)} {
this->_refcnt.store(0);
}
public:
_abstract_datagram_socket() : _refcnt{_refcnt_UNINITIALIZED}, _value{} {
};
_abstract_datagram_socket(_abstract_datagram_socket &&other) noexcept : _refcnt{other._refcnt.load()}, _value{std::move(other._value)} {
_abstract_datagram_socket(_abstract_datagram_socket &&other) noexcept : _abstract_datagram_socket{} {
this->operator=(std::move(other));
};
_abstract_datagram_socket &operator=(_abstract_datagram_socket &&other) noexcept {
_refcnt_t current = other->_refcnt.load();
_refcnt_t other_state = other->_refcnt.load();
while (true) {
if (current > 0)
LdH::abort("Can't move socket while it is in use");
if (this->_refcnt.compare_exchange_weak(current, _refcnt_MOVED))
switch (other_state) {
case _refcnt_UNINITIALIZED:
case _refcnt_MOVED:
case _refcnt_CLOSED:
case 0:
break;
case _refcnt_INITIALIZING:
case _refcnt_MOVING:
case _refcnt_CLOSING:
LdH::abort("Can't move socket while it is in use");
default:
if (other_state > 0)
LdH::abort("Can't move socket while it is in use");
else if (other_state < 0)
LdH::abort("Source socket wrapper corrupted");
}
if (this->_refcnt.compare_exchange_weak(other_state, _refcnt_MOVING))
break;
}
current = this->_refcnt.load();
_refcnt_t this_state = this->_refcnt.load();
while (true) {
if (current >= 0) {
other._refcnt.store(0); // rollback
LdH::abort("Variable already initialized");
switch (this_state) {
case _refcnt_UNINITIALIZED:
case _refcnt_MOVED:
case _refcnt_CLOSED:
case 0:
break;
case _refcnt_INITIALIZING:
case _refcnt_MOVING:
case _refcnt_CLOSING:
goto ALREADY_INITIALIZED;
default:
if (this_state > 0) {
ALREADY_INITIALIZED:
other._refcnt.store(other_state); // rollback
LdH::abort("Variable already initialized");
} else if (this_state < 0)
LdH::abort("Destination socket wrapper corrupted");
}
if (this->_refcnt.compare_exchange_weak(current, 0))
if (this->_refcnt.compare_exchange_weak(this_state, 0))
break;
}
this->_value = std::move(other._value);
this->_refcnt.store(0);
other._refcnt.store(_refcnt_MOVED);
return *this;
}
@ -66,9 +101,12 @@ template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
if (current < 0) {
switch (current) {
case _refcnt_UNINITIALIZED:
case _refcnt_INITIALIZING:
LdH::abort("Socket not initialized");
case _refcnt_MOVED:
case _refcnt_MOVING:
LdH::abort("Socket was moved to another location");
case _refcnt_CLOSING:
case _refcnt_CLOSED:
LdH::abort("Socket was closed");
default:
@ -86,10 +124,13 @@ template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
if (current < 0) {
switch (current) {
case _refcnt_UNINITIALIZED:
case _refcnt_INITIALIZING:
LdH::abort("Socket not initialized");
case _refcnt_MOVED:
case _refcnt_MOVING:
LdH::abort("Socket was moved to another location");
case _refcnt_CLOSED:
case _refcnt_CLOSING:
LdH::abort("Socket was closed");
default:
LdH::abort("Socket wrapper corrupted");
@ -107,10 +148,13 @@ template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
if (current != 0) {
switch (current) {
case _refcnt_UNINITIALIZED:
case _refcnt_INITIALIZING:
LdH::abort("Socket not initialized");
case _refcnt_MOVED:
case _refcnt_MOVING:
LdH::abort("Socket was moved to another location");
case _refcnt_CLOSED:
case _refcnt_CLOSING:
LdH::abort("Socket was already closed");
default:
if (current > 0)

View File

@ -9,66 +9,93 @@ import :berkeley.internals;
import :berkeley.address;
namespace LdH::Sockets::Berkeley {
enum _socket_state_t {
_socket_state_UNINITIALIZED,
_socket_state_MOVED,
_socket_state_AVAILABLE,
_socket_state_READING,
_socket_state_WRITING,
_socket_state_BOTH,
_socket_state_CLOSED,
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _berkeley_socket_commons {
class _stream_socket_commons {
private:
std::atomic<_socket_state_t> _state;
enum _state_t {
_socket_state_UNINITIALIZED,
_socket_state_INITIALIZING,
_socket_state_MOVING,
_socket_state_MOVED,
_socket_state_AVAILABLE,
_socket_state_READING,
_socket_state_WRITING,
_socket_state_BOTH,
_socket_state_CLOSING,
_socket_state_CLOSED,
};
std::atomic<_state_t> _state;
protected:
ctx_t::socket_t _value;
explicit _berkeley_socket_commons(ctx_t::socket_t &&value) : _state{_socket_state_AVAILABLE}, _value{std::move(value)} {
explicit _stream_socket_commons(ctx_t::socket_t &&value) : _state{_socket_state_INITIALIZING}, _value{std::move(value)} {
this->_state.store(_socket_state_AVAILABLE);
}
public:
_berkeley_socket_commons() : _state{_socket_state_UNINITIALIZED}, _value{} {
_stream_socket_commons() : _state{_socket_state_UNINITIALIZED}, _value{} {
};
explicit _berkeley_socket_commons(_berkeley_socket_commons &&other) noexcept : _state{other._state.load()}, _value{std::move(other._value)} {
other._state.store(_socket_state_MOVED);
explicit _stream_socket_commons(_stream_socket_commons &&other) noexcept : _stream_socket_commons{} {
this->operator=(std::move(other));
}
_berkeley_socket_commons &operator=(_berkeley_socket_commons &&other) noexcept {
_socket_state_t current = this->_state.load();
_stream_socket_commons &operator=(_stream_socket_commons &&other) noexcept {
_state_t other_state = other._state.load();
while (true) {
switch (current) {
switch (other_state) {
case _socket_state_READING:
case _socket_state_WRITING:
case _socket_state_BOTH:
case _socket_state_INITIALIZING:
case _socket_state_MOVING:
case _socket_state_CLOSING:
LdH::abort("Can't move socket while it is in use");
default:
if (this->_state.compare_exchange_weak(other_state, _socket_state_MOVING))
goto CHECK_THIS_STATE;
}
}
CHECK_THIS_STATE:
_state_t this_state = this->_state.load();
while (true) {
switch (this_state) {
case _socket_state_UNINITIALIZED:
case _socket_state_MOVED:
case _socket_state_CLOSED:
if (this->_state.compare_exchange_weak(current, _socket_state_AVAILABLE))
goto DONE;
case _socket_state_INITIALIZING:
case _socket_state_MOVING:
case _socket_state_CLOSING:
if (this->_state.compare_exchange_weak(this_state, _socket_state_INITIALIZING))
goto DO_MOVE;
continue;
default:
other._state.store(other_state); // rollback
LdH::abort("Variable already initialized");
}
}
DONE:
this->_state.store(other._state.exchange(_socket_state_MOVED));
DO_MOVE:
this->_value = std::move(other._value);
this->_state.store(_socket_state_AVAILABLE);
other._state.store(_socket_state_MOVED);
return *this;
}
private:
_socket_state_t _cas_state(_socket_state_t expected, _socket_state_t next) {
_state_t _cas_state(_state_t expected, _state_t next) {
this->_state.compare_exchange_strong(expected, next);
return expected;
}
template<_socket_state_t exp1, _socket_state_t next1, _socket_state_t exp2, _socket_state_t next2>
_socket_state_t _cas_state_2() {
_socket_state_t current = this->_state.load();
template<_state_t exp1, _state_t next1, _state_t exp2, _state_t next2>
_state_t _cas_state_2() {
_state_t current = this->_state.load();
while (true) {
switch (current) {
case exp1:
@ -89,7 +116,9 @@ namespace LdH::Sockets::Berkeley {
void _start_reading(char const *busy_msg) {
switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_READING, _socket_state_WRITING, _socket_state_BOTH>()) {
case _socket_state_UNINITIALIZED:
case _socket_state_INITIALIZING:
LdH::abort("Socket not initialized");
case _socket_state_MOVING:
case _socket_state_MOVED:
LdH::abort("Socket was moved to another location");
case _socket_state_AVAILABLE:
@ -99,6 +128,7 @@ namespace LdH::Sockets::Berkeley {
case _socket_state_BOTH:
LdH::abort(busy_msg);
case _socket_state_CLOSED:
case _socket_state_CLOSING:
LdH::abort("Socket was closed");
}
}
@ -110,8 +140,10 @@ namespace LdH::Sockets::Berkeley {
void _start_writing(char const *busy_msg) {
switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_WRITING, _socket_state_READING, _socket_state_BOTH>()) {
case _socket_state_UNINITIALIZED:
case _socket_state_INITIALIZING:
LdH::abort("Socket not initialized");
case _socket_state_MOVED:
case _socket_state_MOVING:
LdH::abort("Socket was moved to another location");
case _socket_state_AVAILABLE:
case _socket_state_READING:
@ -120,6 +152,7 @@ namespace LdH::Sockets::Berkeley {
case _socket_state_BOTH:
LdH::abort(busy_msg);
case _socket_state_CLOSED:
case _socket_state_CLOSING:
LdH::abort("Socket was closed");
}
}
@ -129,10 +162,12 @@ namespace LdH::Sockets::Berkeley {
}
void _close(char const *busy_msg) {
switch (this->_cas_state(_socket_state_AVAILABLE, _socket_state_CLOSED)) {
switch (this->_cas_state(_socket_state_AVAILABLE, _socket_state_CLOSING)) {
case _socket_state_UNINITIALIZED:
case _socket_state_INITIALIZING:
LdH::abort("Socket not initialized");
case _socket_state_MOVED:
case _socket_state_MOVING:
LdH::abort("Socket was moved to another location");
case _socket_state_AVAILABLE:
break;
@ -140,11 +175,13 @@ namespace LdH::Sockets::Berkeley {
case _socket_state_WRITING:
case _socket_state_BOTH:
LdH::abort(busy_msg);
case _socket_state_CLOSING:
case _socket_state_CLOSED:
LdH::abort("Socket already was closed");
}
this->_value.close();
this->_state.store(_socket_state_CLOSED);
}
};
@ -155,17 +192,11 @@ namespace LdH::Sockets::Berkeley {
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class StreamSocket : public _berkeley_socket_commons<ctx_t>, public virtual LdH::Streams::InputStream, public virtual LdH::Streams::OutputStream {
class StreamSocket : public _stream_socket_commons<ctx_t>, public virtual LdH::Streams::InputStream, public virtual LdH::Streams::OutputStream {
private:
explicit StreamSocket(ctx_t::socket_t &&value) : _berkeley_socket_commons<ctx_t>{std::move(value)} {
explicit StreamSocket(ctx_t::socket_t &&value) : _stream_socket_commons<ctx_t>{std::move(value)} {
}
template<class ctx_t2> requires BerkeleySocketsContext<ctx_t2>
friend
class StreamSocketsServer;
template<class>
friend
class _socket_internals;
@ -205,22 +236,10 @@ namespace LdH::Sockets::Berkeley {
}
};
export
template<class ctx_t, class sock_t>
class SocketWithAddress {
public:
Address<ctx_t> addr;
sock_t sock;
SocketWithAddress(Address<ctx_t> addr, sock_t &&sock) : addr{addr}, sock{std::move(sock)} {
}
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class StreamSocketsServer : public _berkeley_socket_commons<ctx_t> {
class StreamSocketsServer : public _stream_socket_commons<ctx_t> {
private:
explicit StreamSocketsServer(ctx_t::socket_t &&value) : _berkeley_socket_commons<ctx_t>{std::move(value)} {
explicit StreamSocketsServer(ctx_t::socket_t &&value) : _stream_socket_commons<ctx_t>{std::move(value)} {
}
template<class>
@ -235,13 +254,15 @@ namespace LdH::Sockets::Berkeley {
StreamSocketsServer &operator=(StreamSocketsServer &&other) noexcept = default;
[[nodiscard]]
SocketWithAddress<ctx_t, StreamSocket<ctx_t> > wait_for_connection() {
StreamSocket<ctx_t> wait_for_connection(Address<ctx_t> *addr) {
this->_start_reading("Socket already waiting for connections");
typename ctx_t::address_t addr{};
typename ctx_t::socket_t raw = this->_value.accept(&addr);
typename ctx_t::address_t addr_raw{};
typename ctx_t::socket_t raw = this->_value.accept(&addr_raw);
this->_finish_reading();
return SocketWithAddress<ctx_t, StreamSocket<ctx_t> >{_addr_internals<ctx_t>::wrap(addr), _socket_internals<StreamSocket<ctx_t> >::wrap(std::move(raw))};
*addr = std::move(_addr_internals<ctx_t>::wrap(addr_raw));
return _socket_internals<StreamSocket<ctx_t> >::wrap(std::move(raw));
}
void close() {

View File

@ -197,10 +197,6 @@ namespace LdH::Sockets {
export using Address = Berkeley::Address<_WinsockContext>;
export
template<class sock_t>
using SocketWithAddress = Berkeley::SocketWithAddress<_WinsockContext, sock_t>;
export using StreamSocket = Berkeley::StreamSocket<_WinsockContext>;
export using StreamSocketsServer = Berkeley::StreamSocketsServer<_WinsockContext>;

View File

@ -1,13 +0,0 @@
module;
#include <Winsock2.h>
#include <ws2tcpip.h>
#include <Windows.h>
export module ru.landgrafhomyak.BGTU.networks_1.sockets:binds;
namespace LdH::Sockets::_Binds {
export using ::sockaddr_in;
export using ::sockaddr_in6;
export using ::SOCKET;
}

View File

@ -4,5 +4,5 @@ import std;
namespace LdH {
export
template<class lambda_t>
concept ThreadRoutine = std::invocable<lambda_t> && std::same_as<std::invoke_result_t<lambda_t>, void> /*&& std::movable<lambda_t>*/ && std::destructible<lambda_t>;
concept ThreadRoutine = std::invocable<lambda_t> && std::same_as<std::invoke_result_t<lambda_t>, void> && std::move_constructible<lambda_t> && std::destructible<lambda_t>;
}