From 11405ebff5a4fd828138150db59ff9cacefa906b Mon Sep 17 00:00:00 2001 From: Andrew Golovashevich Date: Tue, 4 Nov 2025 16:49:13 +0300 Subject: [PATCH] Splitted berkeley sockets abstraction to several files --- modules/sockets/CMakeLists.txt | 9 +- modules/sockets/src/berkeley/address.cppm | 61 ++ modules/sockets/src/berkeley/context.cppm | 33 + modules/sockets/src/berkeley/datagram.cppm | 224 +++++++ .../sockets/src/berkeley/entry_points.cppm | 59 ++ modules/sockets/src/berkeley/internals.cppm | 16 + modules/sockets/src/berkeley/main.cppm | 8 + modules/sockets/src/berkeley/stream.cppm | 262 ++++++++ modules/sockets/src/berkeley_sockets.cppm | 618 ------------------ modules/sockets/src/platform/windows.cpp.inc | 2 +- 10 files changed, 672 insertions(+), 620 deletions(-) create mode 100644 modules/sockets/src/berkeley/address.cppm create mode 100644 modules/sockets/src/berkeley/context.cppm create mode 100644 modules/sockets/src/berkeley/datagram.cppm create mode 100644 modules/sockets/src/berkeley/entry_points.cppm create mode 100644 modules/sockets/src/berkeley/internals.cppm create mode 100644 modules/sockets/src/berkeley/main.cppm create mode 100644 modules/sockets/src/berkeley/stream.cppm delete mode 100644 modules/sockets/src/berkeley_sockets.cppm diff --git a/modules/sockets/CMakeLists.txt b/modules/sockets/CMakeLists.txt index 862c36d..289ad4c 100644 --- a/modules/sockets/CMakeLists.txt +++ b/modules/sockets/CMakeLists.txt @@ -6,8 +6,15 @@ target_sources( PUBLIC FILE_SET cxx_modules TYPE CXX_MODULES FILES src/common.cppm - src/berkeley_sockets.cppm src/windows_binds.cppm + + src/berkeley/context.cppm + src/berkeley/internals.cppm + src/berkeley/address.cppm + src/berkeley/stream.cppm + src/berkeley/datagram.cppm + src/berkeley/entry_points.cppm + src/berkeley/main.cppm ) target_link_libraries(sockets PRIVATE wsock32 ws2_32 exceptions PUBLIC streams) \ No newline at end of file diff --git a/modules/sockets/src/berkeley/address.cppm b/modules/sockets/src/berkeley/address.cppm new file mode 100644 index 0000000..4e75d7a --- /dev/null +++ b/modules/sockets/src/berkeley/address.cppm @@ -0,0 +1,61 @@ +export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.address; + +import std; + +import :berkeley.context; +import :berkeley.internals; + +namespace LdH::Sockets::Berkeley { + export + template requires BerkeleySocketsContext + class Address { + private: + bool _has_value; + ctx_t::address_t _value; + + explicit Address(bool has_value, ctx_t::address_t value) : _has_value{has_value}, _value{value} { + } + + template requires BerkeleySocketsContext + friend class _addr_internals; + + public: + Address() : _has_value{false}, _value{} { + } + + Address(Address const &other) noexcept = default; + + Address(Address &&other) noexcept : _has_value{other._has_value}, _value{other._value} { + other._has_value = false; + } + + Address &operator=(Address const &other) = default; + + Address &operator=(Address &&other) noexcept { + this->_has_value = other._has_value; + this->_value = std::move(other._value); + other._has_value = false; + return *this; + } + + ~Address() { + this->_has_value = false; + } + + static Address parse(char const *host, char const *service) { + return Address{true, ctx_t::address_t::parse(host, service)}; + } + + std::string to_string() { + return this->_value.to_string(); + } + }; + + template requires BerkeleySocketsContext + class _addr_internals { + public: + static bool has_value(Address const &v) { return v._has_value; } + static ctx_t::address_t unwrap(Address const &v) { return v._value; } + static Address wrap(ctx_t::address_t const &raw) { return Address{true, raw}; } + }; +} diff --git a/modules/sockets/src/berkeley/context.cppm b/modules/sockets/src/berkeley/context.cppm new file mode 100644 index 0000000..00d1212 --- /dev/null +++ b/modules/sockets/src/berkeley/context.cppm @@ -0,0 +1,33 @@ +export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.context; + +import std; + +namespace LdH::Sockets::Berkeley { + export + template + concept BerkeleySocketsContext = requires { typename ctx_t::address_t; } && + std::default_initializable && std::copyable && std::destructible && + requires(char const *h, char const *s) { { ctx_t::address_t::parse(h, s) } -> std::same_as; } && + requires(typename ctx_t::address_t s) { { s.to_string() } -> std::same_as; } && + requires { typename ctx_t::sock_type_t; } && + requires { typename ctx_t::sock_type; } && + requires { { ctx_t::sock_type::stream() } -> std::same_as; } && + requires { { ctx_t::sock_type::dgram() } -> std::same_as; } && + requires { typename ctx_t::proto_t; } && + requires { typename ctx_t::proto; } && + requires { { ctx_t::proto::tcp() } -> std::same_as; } && + requires { { ctx_t::proto::udp() } -> std::same_as; } && + requires { { ctx_t::proto::icmp() } -> std::same_as; } && + requires { typename ctx_t::socket_t; } && + std::default_initializable && std::movable && std::destructible && + requires(typename ctx_t::address_t a, typename ctx_t::sock_type_t t, typename ctx_t::proto_t p) { { ctx_t::socket_t::create(a, t, p) } -> std::same_as; } && + requires(typename ctx_t::socket_t s, std::size_t q) { { s.listen(q) } -> std::same_as; } && + requires(typename ctx_t::socket_t s, typename ctx_t::address_t e) { { s.connect(e) } -> std::same_as; } && + requires(typename ctx_t::socket_t s, typename ctx_t::address_t e) { { s.bind(e) } -> std::same_as; } && + requires(typename ctx_t::socket_t s) { { s.close() } -> std::same_as; } && + requires(typename ctx_t::socket_t s, std::size_t c, char const *d) { { s.send_stream(c, d) } -> std::same_as; } && + requires(typename ctx_t::socket_t s, std::size_t c, char *d) { { s.recv_stream(c, d) } -> std::same_as; } && + requires(typename ctx_t::socket_t s, std::size_t c, char *d) { { s.recv_datagram(c, d) } -> std::same_as; } && + requires(typename ctx_t::socket_t s, typename ctx_t::address_t e, std::size_t c, char const *d) { { s.send_datagram(e, c, d) } -> std::same_as; } && + requires(typename ctx_t::socket_t s, typename ctx_t::address_t *a) { { s.accept(a) } -> std::same_as; }; +} diff --git a/modules/sockets/src/berkeley/datagram.cppm b/modules/sockets/src/berkeley/datagram.cppm new file mode 100644 index 0000000..9430cb6 --- /dev/null +++ b/modules/sockets/src/berkeley/datagram.cppm @@ -0,0 +1,224 @@ +export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.stream; + +import std; + +import ru.landgrafhomyak.BGTU.networks_1.exceptions; +import ru.landgrafhomyak.BGTU.networks_1.streams; +import :berkeley.context; +import :berkeley.internals; +import :berkeley.address; + +namespace LdH::Sockets::Berkeley { +template requires BerkeleySocketsContext + class _abstract_datagram_socket { + private: + using _refcnt_t = signed long long; + + std::atomic<_refcnt_t> _refcnt; + + protected: + ctx_t::socket_t _value; + + private: + static constexpr _refcnt_t _refcnt_UNINITIALIZED = -0x0800'0000'0000'0000ll; + static constexpr _refcnt_t _refcnt_MOVED = -0x0800'0000'0000'0001ll; + static constexpr _refcnt_t _refcnt_CLOSED = -0x07FFF'FFFF'FFFF'FFFFll; + + protected: + explicit _abstract_datagram_socket(ctx_t::socket_t &&value) : _refcnt{0}, _value{std::move(value)} { + } + + public: + _abstract_datagram_socket() : _refcnt{_refcnt_UNINITIALIZED}, _value{} { + }; + + _abstract_datagram_socket(_abstract_datagram_socket &&other) noexcept : _refcnt{other._refcnt.load()}, _value{std::move(other._value)} { + }; + + _abstract_datagram_socket &operator=(_abstract_datagram_socket &&other) noexcept { + _refcnt_t current = other->_refcnt.load(); + while (true) { + if (current > 0) + LdH::abort("Can't move socket while it is in use"); + if (this->_refcnt.compare_exchange_weak(current, _refcnt_MOVED)) + break; + } + + current = this->_refcnt.load(); + while (true) { + if (current >= 0) { + other._refcnt.store(0); // rollback + LdH::abort("Variable already initialized"); + } + if (this->_refcnt.compare_exchange_weak(current, 0)) + break; + } + + this->_value = std::move(other._value); + + return *this; + } + + protected: + void _start_usage() { + _refcnt_t current = this->_refcnt.load(); + while (true) { + if (current < 0) { + switch (current) { + case _refcnt_UNINITIALIZED: + LdH::abort("Socket not initialized"); + case _refcnt_MOVED: + LdH::abort("Socket was moved to another location"); + case _refcnt_CLOSED: + LdH::abort("Socket was closed"); + default: + LdH::abort("Socket wrapper corrupted"); + } + } + if (this->_refcnt.compare_exchange_weak(current, current + 1)) + break; + } + } + + void _finish_usage() { + _refcnt_t current = this->_refcnt.load(); + while (true) { + if (current < 0) { + switch (current) { + case _refcnt_UNINITIALIZED: + LdH::abort("Socket not initialized"); + case _refcnt_MOVED: + LdH::abort("Socket was moved to another location"); + case _refcnt_CLOSED: + LdH::abort("Socket was closed"); + default: + LdH::abort("Socket wrapper corrupted"); + } + } + if (this->_refcnt.compare_exchange_weak(current, current - 1)) + break; + } + } + + protected: + void _close() { + _refcnt_t current = this->_refcnt.load(); + while (true) { + if (current != 0) { + switch (current) { + case _refcnt_UNINITIALIZED: + LdH::abort("Socket not initialized"); + case _refcnt_MOVED: + LdH::abort("Socket was moved to another location"); + case _refcnt_CLOSED: + LdH::abort("Socket was already closed"); + default: + if (current > 0) + LdH::abort("Can't close socket while it is in use"); + else + LdH::abort("Socket wrapper corrupted"); + } + } + if (this->_refcnt.compare_exchange_weak(current, _refcnt_CLOSED)) + break; + } + } + + ~_abstract_datagram_socket() noexcept = default; + }; + + export + template requires BerkeleySocketsContext + class ServerDatagramSocket : public _abstract_datagram_socket { + private: + explicit ServerDatagramSocket(ctx_t::socket_t &&value) : _abstract_datagram_socket{std::move(value)} { + } + + template + friend + class _socket_internals; + + public: + ServerDatagramSocket() = default; + + ServerDatagramSocket(ServerDatagramSocket &&other) noexcept = default; + + ServerDatagramSocket &operator=(ServerDatagramSocket &&other) noexcept = default; + + public: + Address recvOneTruncating(std::size_t size, char *data) { + this->_start_usage(); + auto addr = this->_value.recv_datagram(size, data); + this->_finish_usage(); + return _addr_internals::wrap(std::move(addr)); + } + + void sendOne(Address destination, std::size_t size, char const *data) { + this->_start_usage(); + this->_value.send_datagram(destination, size, data); + this->_finish_usage(); + } + + void close() { + this->_close(); + } + + ~ServerDatagramSocket() noexcept = default; + }; + + + template requires BerkeleySocketsContext + class _socket_internals > { + public: + static ServerDatagramSocket wrap(ctx_t::socket_t &&raw) { + return ServerDatagramSocket{std::move(raw)}; + } + }; + + export + template requires BerkeleySocketsContext + class ClientDatagramSocket : public _abstract_datagram_socket, public LdH::Streams::InputMessanger, public LdH::Streams::OutputMessanger { + private: + explicit ClientDatagramSocket(ctx_t::socket_t &&value) : _abstract_datagram_socket{std::move(value)} { + } + + template + friend + class _socket_internals; + + public: + ClientDatagramSocket() = default; + + ClientDatagramSocket(ClientDatagramSocket &&other) noexcept = default; + + ClientDatagramSocket &operator=(ClientDatagramSocket &&other) noexcept = default; + + public: + void recvOneTruncating(std::size_t size, char *data) override { + this->_start_usage(); + this->_value.recv_stream(size, data); + this->_finish_usage(); + } + + void sendOne(std::size_t size, char const *data) override { + this->_start_usage(); + this->_value.send_stream(size, data); + this->_finish_usage(); + } + + void close() override { + this->_close(); + } + + ~ClientDatagramSocket() noexcept override = default; + }; + + + template requires BerkeleySocketsContext + class _socket_internals > { + public: + static ClientDatagramSocket wrap(ctx_t::socket_t &&raw) { + return ClientDatagramSocket{std::move(raw)}; + } + }; +} diff --git a/modules/sockets/src/berkeley/entry_points.cppm b/modules/sockets/src/berkeley/entry_points.cppm new file mode 100644 index 0000000..cb001de --- /dev/null +++ b/modules/sockets/src/berkeley/entry_points.cppm @@ -0,0 +1,59 @@ +export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.entry_points; + +import std; + +import ru.landgrafhomyak.BGTU.networks_1.exceptions; + +import :berkeley.context; +import :berkeley.internals; +import :berkeley.address; +import :berkeley.stream; +import :berkeley.datagram; + +namespace LdH::Sockets::Berkeley { + + export + template requires BerkeleySocketsContext + [[nodiscard]] + StreamSocket connect_tcp(Address addr) { + if (!_addr_internals::has_value(addr)) + LdH::abort("Address not initialized"); + typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals::unwrap(addr), ctx_t::sock_type::stream(), ctx_t::proto::tcp()); + sock.connect(_addr_internals::unwrap(addr)); + return _socket_internals >::wrap(std::move(sock)); + } + + export + template requires BerkeleySocketsContext + [[nodiscard]] + StreamSocketsServer listen_tcp(Address addr, std::size_t queue_size) { + if (!_addr_internals::has_value(addr)) + LdH::abort("Address not initialized"); + typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals::unwrap(addr), ctx_t::sock_type::stream(), ctx_t::proto::tcp()); + sock.bind(_addr_internals::unwrap(addr)); + sock.listen(queue_size); + return _socket_internals >::wrap(std::move(sock)); + } + + export + template requires BerkeleySocketsContext + [[nodiscard]] + ClientDatagramSocket connect_udp(Address addr) { + if (!_addr_internals::has_value(addr)) + LdH::abort("Address not initialized"); + typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals::unwrap(addr), ctx_t::sock_type::dgram(), ctx_t::proto::udp()); + sock.connect(_addr_internals::unwrap(addr)); + return _socket_internals >::wrap(std::move(sock)); + } + + export + template requires BerkeleySocketsContext + [[nodiscard]] + ServerDatagramSocket listen_udp(Address addr) { + if (!_addr_internals::has_value(addr)) + LdH::abort("Address not initialized"); + typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals::unwrap(addr), ctx_t::sock_type::dgram(), ctx_t::proto::udp()); + sock.bind(_addr_internals::unwrap(addr)); + return _socket_internals >::wrap(std::move(sock)); + } +} diff --git a/modules/sockets/src/berkeley/internals.cppm b/modules/sockets/src/berkeley/internals.cppm new file mode 100644 index 0000000..2d5e5f5 --- /dev/null +++ b/modules/sockets/src/berkeley/internals.cppm @@ -0,0 +1,16 @@ +export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.internals; + +import std; + +import :berkeley.context; + +namespace LdH::Sockets::Berkeley { + export + template requires BerkeleySocketsContext + class _addr_internals; + + export + template + class _socket_internals { + }; +} diff --git a/modules/sockets/src/berkeley/main.cppm b/modules/sockets/src/berkeley/main.cppm new file mode 100644 index 0000000..d421893 --- /dev/null +++ b/modules/sockets/src/berkeley/main.cppm @@ -0,0 +1,8 @@ +export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley; + + +export import :berkeley.context; +export import :berkeley.address; +export import :berkeley.stream; +export import :berkeley.datagram; +export import :berkeley.entry_points; diff --git a/modules/sockets/src/berkeley/stream.cppm b/modules/sockets/src/berkeley/stream.cppm new file mode 100644 index 0000000..9707808 --- /dev/null +++ b/modules/sockets/src/berkeley/stream.cppm @@ -0,0 +1,262 @@ +export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.datagram; + +import std; + +import ru.landgrafhomyak.BGTU.networks_1.exceptions; +import ru.landgrafhomyak.BGTU.networks_1.streams; +import :berkeley.context; +import :berkeley.internals; +import :berkeley.address; + +namespace LdH::Sockets::Berkeley { + enum _socket_state_t { + _socket_state_UNINITIALIZED, + _socket_state_MOVED, + _socket_state_AVAILABLE, + _socket_state_READING, + _socket_state_WRITING, + _socket_state_BOTH, + _socket_state_CLOSED, + }; + + template requires BerkeleySocketsContext + class _berkeley_socket_commons { + private: + std::atomic<_socket_state_t> _state; + + protected: + ctx_t::socket_t _value; + + explicit _berkeley_socket_commons(ctx_t::socket_t &&value) : _state{_socket_state_AVAILABLE}, _value{std::move(value)} { + } + + public: + _berkeley_socket_commons() : _state{_socket_state_UNINITIALIZED}, _value{} { + }; + + explicit _berkeley_socket_commons(_berkeley_socket_commons &&other) noexcept : _state{other._state.load()}, _value{std::move(other._value)} { + other._state.store(_socket_state_MOVED); + } + + _berkeley_socket_commons &operator=(_berkeley_socket_commons &&other) noexcept { + _socket_state_t current = this->_state.load(); + while (true) { + switch (current) { + case _socket_state_UNINITIALIZED: + case _socket_state_MOVED: + case _socket_state_CLOSED: + if (this->_state.compare_exchange_weak(current, _socket_state_AVAILABLE)) + goto DONE; + continue; + default: + LdH::abort("Variable already initialized"); + } + } + + DONE: + this->_state.store(other._state.exchange(_socket_state_MOVED)); + this->_value = std::move(other._value); + + return *this; + } + + private: + _socket_state_t _cas_state(_socket_state_t expected, _socket_state_t next) { + this->_state.compare_exchange_strong(expected, next); + return expected; + } + + template<_socket_state_t exp1, _socket_state_t next1, _socket_state_t exp2, _socket_state_t next2> + _socket_state_t _cas_state_2() { + _socket_state_t current = this->_state.load(); + while (true) { + switch (current) { + case exp1: + if (this->_state.compare_exchange_weak(current, next1)) + return current; + continue; + case exp2: + if (this->_state.compare_exchange_weak(current, next2)) + return current; + continue; + default: + return current; + } + } + } + + protected: + void _start_reading(char const *busy_msg) { + switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_READING, _socket_state_WRITING, _socket_state_BOTH>()) { + case _socket_state_UNINITIALIZED: + LdH::abort("Socket not initialized"); + case _socket_state_MOVED: + LdH::abort("Socket was moved to another location"); + case _socket_state_AVAILABLE: + case _socket_state_WRITING: + break; + case _socket_state_READING: + case _socket_state_BOTH: + LdH::abort(busy_msg); + case _socket_state_CLOSED: + LdH::abort("Socket was closed"); + } + } + + void _finish_reading() { + this->_cas_state_2<_socket_state_READING, _socket_state_AVAILABLE, _socket_state_BOTH, _socket_state_WRITING>(); + } + + void _start_writing(char const *busy_msg) { + switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_WRITING, _socket_state_READING, _socket_state_BOTH>()) { + case _socket_state_UNINITIALIZED: + LdH::abort("Socket not initialized"); + case _socket_state_MOVED: + LdH::abort("Socket was moved to another location"); + case _socket_state_AVAILABLE: + case _socket_state_READING: + break; + case _socket_state_WRITING: + case _socket_state_BOTH: + LdH::abort(busy_msg); + case _socket_state_CLOSED: + LdH::abort("Socket was closed"); + } + } + + void _finish_writing() { + this->_cas_state_2<_socket_state_WRITING, _socket_state_AVAILABLE, _socket_state_BOTH, _socket_state_READING>(); + } + + void _close(char const *busy_msg) { + switch (this->_cas_state(_socket_state_AVAILABLE, _socket_state_CLOSED)) { + case _socket_state_UNINITIALIZED: + LdH::abort("Socket not initialized"); + case _socket_state_MOVED: + LdH::abort("Socket was moved to another location"); + case _socket_state_AVAILABLE: + break; + case _socket_state_READING: + case _socket_state_WRITING: + case _socket_state_BOTH: + LdH::abort(busy_msg); + case _socket_state_CLOSED: + LdH::abort("Socket already was closed"); + } + + this->_value.close(); + } + }; + + export + template requires BerkeleySocketsContext + class StreamSocketsServer; + + + export + template requires BerkeleySocketsContext + class StreamSocket : public _berkeley_socket_commons, public virtual LdH::Streams::InputStream, public virtual LdH::Streams::OutputStream { + private: + explicit StreamSocket(ctx_t::socket_t &&value) : _berkeley_socket_commons{std::move(value)} { + } + + + template requires BerkeleySocketsContext + friend + class StreamSocketsServer; + + + template + friend + class _socket_internals; + + public: + StreamSocket() = default; + + StreamSocket(StreamSocket &&other) noexcept = default; + + StreamSocket &operator=(StreamSocket &&other) noexcept = default; + + void read(std::size_t size, char *data) override { + this->_start_reading("Socket already read by another thread"); + this->_value.recv_stream(size, data); + this->_finish_reading(); + } + + void write(std::size_t size, char const *data) override { + this->_start_writing("Socket already written by another thread"); + this->_value.send_stream(size, data); + this->_finish_writing(); + } + + void close() override { + this->_close("Socket is busy with reading or writing"); + } + + ~StreamSocket() noexcept override = default; + }; + + + template requires BerkeleySocketsContext + class _socket_internals > { + public: + static StreamSocket wrap(ctx_t::socket_t &&raw) { + return StreamSocket{std::move(raw)}; + } + }; + + + export + template + class SocketWithAddress { + public: + Address addr; + sock_t sock; + + SocketWithAddress(Address addr, sock_t &&sock) : addr{addr}, sock{std::move(sock)} { + } + }; + + template requires BerkeleySocketsContext + class StreamSocketsServer : public _berkeley_socket_commons { + private: + explicit StreamSocketsServer(ctx_t::socket_t &&value) : _berkeley_socket_commons{std::move(value)} { + } + + template + friend + class _socket_internals; + + public: + StreamSocketsServer() = default; + + StreamSocketsServer(StreamSocketsServer &&other) noexcept = default; + + StreamSocketsServer &operator=(StreamSocketsServer &&other) noexcept = default; + + [[nodiscard]] + SocketWithAddress > wait_for_connection() { + this->_start_reading("Socket already waiting for connections"); + + typename ctx_t::address_t addr{}; + typename ctx_t::socket_t raw = this->_value.accept(&addr); + this->_finish_reading(); + return SocketWithAddress >{_addr_internals::wrap(addr), _socket_internals >::wrap(std::move(raw))}; + } + + void close() { + this->_close("Socket busy with waiting for connection"); + } + + ~StreamSocketsServer() noexcept = default; + }; + + + template requires BerkeleySocketsContext + class _socket_internals > { + public: + static StreamSocketsServer wrap(ctx_t::socket_t &&raw) { + return StreamSocketsServer{std::move(raw)}; + } + }; +} diff --git a/modules/sockets/src/berkeley_sockets.cppm b/modules/sockets/src/berkeley_sockets.cppm deleted file mode 100644 index bbb984f..0000000 --- a/modules/sockets/src/berkeley_sockets.cppm +++ /dev/null @@ -1,618 +0,0 @@ -export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley_sockets; - -import std; -import ru.landgrafhomyak.BGTU.networks_1.exceptions; -import ru.landgrafhomyak.BGTU.networks_1.streams; - -namespace LdH::Sockets::Berkeley { -#pragma region context concept - export - template - concept BerkeleySocketsContext = requires { typename ctx_t::address_t; } && - std::default_initializable && std::copyable && std::destructible && - requires(char const *h, char const *s) { { ctx_t::address_t::parse(h, s) } -> std::same_as; } && - requires(typename ctx_t::address_t s) { { s.to_string() } -> std::same_as; } && - requires { typename ctx_t::sock_type_t; } && - requires { typename ctx_t::sock_type; } && - requires { { ctx_t::sock_type::stream() } -> std::same_as; } && - requires { { ctx_t::sock_type::dgram() } -> std::same_as; } && - requires { typename ctx_t::proto_t; } && - requires { typename ctx_t::proto; } && - requires { { ctx_t::proto::tcp() } -> std::same_as; } && - requires { { ctx_t::proto::udp() } -> std::same_as; } && - requires { { ctx_t::proto::icmp() } -> std::same_as; } && - requires { typename ctx_t::socket_t; } && - std::default_initializable && std::movable && std::destructible && - requires(typename ctx_t::address_t a, typename ctx_t::sock_type_t t, typename ctx_t::proto_t p) { { ctx_t::socket_t::create(a, t, p) } -> std::same_as; } && - requires(typename ctx_t::socket_t s, std::size_t q) { { s.listen(q) } -> std::same_as; } && - requires(typename ctx_t::socket_t s, typename ctx_t::address_t e) { { s.connect(e) } -> std::same_as; } && - requires(typename ctx_t::socket_t s, typename ctx_t::address_t e) { { s.bind(e) } -> std::same_as; } && - requires(typename ctx_t::socket_t s) { { s.close() } -> std::same_as; } && - requires(typename ctx_t::socket_t s, std::size_t c, char const *d) { { s.send_stream(c, d) } -> std::same_as; } && - requires(typename ctx_t::socket_t s, std::size_t c, char *d) { { s.recv_stream(c, d) } -> std::same_as; } && - requires(typename ctx_t::socket_t s, std::size_t c, char *d) { { s.recv_datagram(c, d) } -> std::same_as; } && - requires(typename ctx_t::socket_t s, typename ctx_t::address_t e, std::size_t c, char const *d) { { s.send_datagram(e, c, d) } -> std::same_as; } && - requires(typename ctx_t::socket_t s, typename ctx_t::address_t *a) { { s.accept(a) } -> std::same_as; }; - -#pragma endregion - -#pragma region address - - template requires BerkeleySocketsContext - class _addr_internals; - - export - template requires BerkeleySocketsContext - class Address { - private: - bool _has_value; - ctx_t::address_t _value; - - explicit Address(bool has_value, ctx_t::address_t value) : _has_value{has_value}, _value{value} { - } - - template requires BerkeleySocketsContext - friend class _addr_internals; - - public: - Address() : _has_value{false}, _value{} { - } - - Address(Address const &other) noexcept = default; - - Address(Address &&other) noexcept : _has_value{other._has_value}, _value{other._value} { - other._has_value = false; - } - - Address &operator=(Address const &other) = default; - - Address &operator=(Address &&other) noexcept { - this->_has_value = other._has_value; - this->_value = std::move(other._value); - other._has_value = false; - return *this; - } - - ~Address() { - this->_has_value = false; - } - - static Address parse(char const *host, char const *service) { - return Address{true, ctx_t::address_t::parse(host, service)}; - } - - std::string to_string() { - return this->_value.to_string(); - } - }; - - template requires BerkeleySocketsContext - class _addr_internals { - public: - static bool has_value(Address const &v) { return v._has_value; } - static ctx_t::address_t unwrap(Address const &v) { return v._value; } - static Address wrap(ctx_t::address_t const &raw) { return Address{true, raw}; } - }; - -#pragma endregion - -#pragma region abstract socket - enum _socket_state_t { - _socket_state_UNINITIALIZED, - _socket_state_MOVED, - _socket_state_AVAILABLE, - _socket_state_READING, - _socket_state_WRITING, - _socket_state_BOTH, - _socket_state_CLOSED, - }; - - template requires BerkeleySocketsContext - class _berkeley_socket_commons { - private: - std::atomic<_socket_state_t> _state; - - protected: - ctx_t::socket_t _value; - - explicit _berkeley_socket_commons(ctx_t::socket_t &&value) : _state{_socket_state_AVAILABLE}, _value{std::move(value)} { - } - - public: - _berkeley_socket_commons() : _state{_socket_state_UNINITIALIZED}, _value{} { - }; - - explicit _berkeley_socket_commons(_berkeley_socket_commons &&other) noexcept : _state{other._state.load()}, _value{std::move(other._value)} { - other._state.store(_socket_state_MOVED); - } - - _berkeley_socket_commons &operator=(_berkeley_socket_commons &&other) noexcept { - _socket_state_t current = this->_state.load(); - while (true) { - switch (current) { - case _socket_state_UNINITIALIZED: - case _socket_state_MOVED: - case _socket_state_CLOSED: - if (this->_state.compare_exchange_weak(current, _socket_state_AVAILABLE)) - goto DONE; - continue; - default: - LdH::abort("Variable already initialized"); - } - } - - DONE: - this->_state.store(other._state.exchange(_socket_state_MOVED)); - this->_value = std::move(other._value); - - return *this; - } - - private: - _socket_state_t _cas_state(_socket_state_t expected, _socket_state_t next) { - this->_state.compare_exchange_strong(expected, next); - return expected; - } - - template<_socket_state_t exp1, _socket_state_t next1, _socket_state_t exp2, _socket_state_t next2> - _socket_state_t _cas_state_2() { - _socket_state_t current = this->_state.load(); - while (true) { - switch (current) { - case exp1: - if (this->_state.compare_exchange_weak(current, next1)) - return current; - continue; - case exp2: - if (this->_state.compare_exchange_weak(current, next2)) - return current; - continue; - default: - return current; - } - } - } - - protected: - void _start_reading(char const *busy_msg) { - switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_READING, _socket_state_WRITING, _socket_state_BOTH>()) { - case _socket_state_UNINITIALIZED: - LdH::abort("Socket not initialized"); - case _socket_state_MOVED: - LdH::abort("Socket was moved to another location"); - case _socket_state_AVAILABLE: - case _socket_state_WRITING: - break; - case _socket_state_READING: - case _socket_state_BOTH: - LdH::abort(busy_msg); - case _socket_state_CLOSED: - LdH::abort("Socket was closed"); - } - } - - void _finish_reading() { - this->_cas_state_2<_socket_state_READING, _socket_state_AVAILABLE, _socket_state_BOTH, _socket_state_WRITING>(); - } - - void _start_writing(char const *busy_msg) { - switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_WRITING, _socket_state_READING, _socket_state_BOTH>()) { - case _socket_state_UNINITIALIZED: - LdH::abort("Socket not initialized"); - case _socket_state_MOVED: - LdH::abort("Socket was moved to another location"); - case _socket_state_AVAILABLE: - case _socket_state_READING: - break; - case _socket_state_WRITING: - case _socket_state_BOTH: - LdH::abort(busy_msg); - case _socket_state_CLOSED: - LdH::abort("Socket was closed"); - } - } - - void _finish_writing() { - this->_cas_state_2<_socket_state_WRITING, _socket_state_AVAILABLE, _socket_state_BOTH, _socket_state_READING>(); - } - - void _close(char const *busy_msg) { - switch (this->_cas_state(_socket_state_AVAILABLE, _socket_state_CLOSED)) { - case _socket_state_UNINITIALIZED: - LdH::abort("Socket not initialized"); - case _socket_state_MOVED: - LdH::abort("Socket was moved to another location"); - case _socket_state_AVAILABLE: - break; - case _socket_state_READING: - case _socket_state_WRITING: - case _socket_state_BOTH: - LdH::abort(busy_msg); - case _socket_state_CLOSED: - LdH::abort("Socket already was closed"); - } - - this->_value.close(); - } - }; - - template - class _socket_internals { - }; -#pragma endregion - -#pragma region stream sockets - export - template requires BerkeleySocketsContext - class StreamSocketsServer; - - - export - template requires BerkeleySocketsContext - class StreamSocket : public _berkeley_socket_commons, public virtual LdH::Streams::InputStream, public virtual LdH::Streams::OutputStream { - private: - explicit StreamSocket(ctx_t::socket_t &&value) : _berkeley_socket_commons{std::move(value)} { - } - - - template requires BerkeleySocketsContext - friend - class StreamSocketsServer; - - - template - friend - class _socket_internals; - - public: - StreamSocket() = default; - - StreamSocket(StreamSocket &&other) noexcept = default; - - StreamSocket &operator=(StreamSocket &&other) noexcept = default; - - void read(std::size_t size, char *data) override { - this->_start_reading("Socket already read by another thread"); - this->_value.recv_stream(size, data); - this->_finish_reading(); - } - - void write(std::size_t size, char const *data) override { - this->_start_writing("Socket already written by another thread"); - this->_value.send_stream(size, data); - this->_finish_writing(); - } - - void close() override { - this->_close("Socket is busy with reading or writing"); - } - - ~StreamSocket() noexcept override = default; - }; - - - template - class _socket_internals > { - public: - static StreamSocket wrap(ctx_t::socket_t &&raw) { - return StreamSocket{std::move(raw)}; - } - }; - - - export - template - class SocketWithAddress { - public: - Address addr; - sock_t sock; - - SocketWithAddress(Address addr, sock_t &&sock) : addr{addr}, sock{std::move(sock)} { - } - }; - - template requires BerkeleySocketsContext - class StreamSocketsServer : public _berkeley_socket_commons { - private: - explicit StreamSocketsServer(ctx_t::socket_t &&value) : _berkeley_socket_commons{std::move(value)} { - } - - template - friend - class _socket_internals; - - public: - StreamSocketsServer() = default; - - StreamSocketsServer(StreamSocketsServer &&other) noexcept = default; - - StreamSocketsServer &operator=(StreamSocketsServer &&other) noexcept = default; - - [[nodiscard]] - SocketWithAddress > wait_for_connection() { - this->_start_reading("Socket already waiting for connections"); - - typename ctx_t::address_t addr{}; - typename ctx_t::socket_t raw = this->_value.accept(&addr); - this->_finish_reading(); - return SocketWithAddress >{_addr_internals::wrap(addr), _socket_internals >::wrap(std::move(raw))}; - } - - void close() { - this->_close("Socket busy with waiting for connection"); - } - - ~StreamSocketsServer() noexcept = default; - }; - - - template - class _socket_internals > { - public: - static StreamSocketsServer wrap(ctx_t::socket_t &&raw) { - return StreamSocketsServer{std::move(raw)}; - } - }; - - export - template requires BerkeleySocketsContext - [[nodiscard]] - StreamSocket connect_tcp(Address addr) { - if (!_addr_internals::has_value(addr)) - LdH::abort("Address not initialized"); - typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals::unwrap(addr), ctx_t::sock_type::stream(), ctx_t::proto::tcp()); - sock.connect(_addr_internals::unwrap(addr)); - return _socket_internals >::wrap(std::move(sock)); - } - - export - template requires BerkeleySocketsContext - [[nodiscard]] - StreamSocketsServer listen_tcp(Address addr, std::size_t queue_size) { - if (!_addr_internals::has_value(addr)) - LdH::abort("Address not initialized"); - typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals::unwrap(addr), ctx_t::sock_type::stream(), ctx_t::proto::tcp()); - sock.bind(_addr_internals::unwrap(addr)); - sock.listen(queue_size); - return _socket_internals >::wrap(std::move(sock)); - } -#pragma endregion - -#pragma region datagram sockets - - template requires BerkeleySocketsContext - class _abstract_datagram_socket { - private: - using _refcnt_t = signed long long; - - std::atomic<_refcnt_t> _refcnt; - - protected: - ctx_t::socket_t _value; - - private: - static constexpr _refcnt_t _refcnt_UNINITIALIZED = -0x0800'0000'0000'0000ll; - static constexpr _refcnt_t _refcnt_MOVED = -0x0800'0000'0000'0001ll; - static constexpr _refcnt_t _refcnt_CLOSED = -0x07FFF'FFFF'FFFF'FFFFll; - - protected: - explicit _abstract_datagram_socket(ctx_t::socket_t &&value) : _refcnt{0}, _value{std::move(value)} { - } - - public: - _abstract_datagram_socket() : _refcnt{_refcnt_UNINITIALIZED}, _value{} { - }; - - _abstract_datagram_socket(_abstract_datagram_socket &&other) noexcept : _refcnt{other._refcnt.load()}, _value{std::move(other._value)} { - }; - - _abstract_datagram_socket &operator=(_abstract_datagram_socket &&other) noexcept { - _refcnt_t current = other->_refcnt.load(); - while (true) { - if (current > 0) - LdH::abort("Can't move socket while it is in use"); - if (this->_refcnt.compare_exchange_weak(current, _refcnt_MOVED)) - break; - } - - current = this->_refcnt.load(); - while (true) { - if (current >= 0) { - other._refcnt.store(0); // rollback - LdH::abort("Variable already initialized"); - } - if (this->_refcnt.compare_exchange_weak(current, 0)) - break; - } - - this->_value = std::move(other._value); - - return *this; - } - - protected: - void _start_usage() { - _refcnt_t current = this->_refcnt.load(); - while (true) { - if (current < 0) { - switch (current) { - case _refcnt_UNINITIALIZED: - LdH::abort("Socket not initialized"); - case _refcnt_MOVED: - LdH::abort("Socket was moved to another location"); - case _refcnt_CLOSED: - LdH::abort("Socket was closed"); - default: - LdH::abort("Socket wrapper corrupted"); - } - } - if (this->_refcnt.compare_exchange_weak(current, current + 1)) - break; - } - } - - void _finish_usage() { - _refcnt_t current = this->_refcnt.load(); - while (true) { - if (current < 0) { - switch (current) { - case _refcnt_UNINITIALIZED: - LdH::abort("Socket not initialized"); - case _refcnt_MOVED: - LdH::abort("Socket was moved to another location"); - case _refcnt_CLOSED: - LdH::abort("Socket was closed"); - default: - LdH::abort("Socket wrapper corrupted"); - } - } - if (this->_refcnt.compare_exchange_weak(current, current - 1)) - break; - } - } - - protected: - void _close() { - _refcnt_t current = this->_refcnt.load(); - while (true) { - if (current != 0) { - switch (current) { - case _refcnt_UNINITIALIZED: - LdH::abort("Socket not initialized"); - case _refcnt_MOVED: - LdH::abort("Socket was moved to another location"); - case _refcnt_CLOSED: - LdH::abort("Socket was already closed"); - default: - if (current > 0) - LdH::abort("Can't close socket while it is in use"); - else - LdH::abort("Socket wrapper corrupted"); - } - } - if (this->_refcnt.compare_exchange_weak(current, _refcnt_CLOSED)) - break; - } - } - - ~_abstract_datagram_socket() noexcept = default; - }; - - export - template requires BerkeleySocketsContext - class ServerDatagramSocket : public _abstract_datagram_socket { - private: - explicit ServerDatagramSocket(ctx_t::socket_t &&value) : _abstract_datagram_socket{std::move(value)} { - } - - template - friend - class _socket_internals; - - public: - ServerDatagramSocket() = default; - - ServerDatagramSocket(ServerDatagramSocket &&other) noexcept = default; - - ServerDatagramSocket &operator=(ServerDatagramSocket &&other) noexcept = default; - - public: - Address recvOneTruncating(std::size_t size, char *data) { - this->_start_usage(); - auto addr = this->_value.recv_datagram(size, data); - this->_finish_usage(); - return _addr_internals::wrap(std::move(addr)); - } - - void sendOne(Address destination, std::size_t size, char const *data) { - this->_start_usage(); - this->_value.send_datagram(destination, size, data); - this->_finish_usage(); - } - - void close() { - this->_close(); - } - - ~ServerDatagramSocket() noexcept = default; - }; - - - template - class _socket_internals > { - public: - static ServerDatagramSocket wrap(ctx_t::socket_t &&raw) { - return ServerDatagramSocket{std::move(raw)}; - } - }; - - export - template requires BerkeleySocketsContext - class ClientDatagramSocket : public _abstract_datagram_socket, public LdH::Streams::InputMessanger, public LdH::Streams::OutputMessanger { - private: - explicit ClientDatagramSocket(ctx_t::socket_t &&value) : _abstract_datagram_socket{std::move(value)} { - } - - template - friend - class _socket_internals; - - public: - ClientDatagramSocket() = default; - - ClientDatagramSocket(ClientDatagramSocket &&other) noexcept = default; - - ClientDatagramSocket &operator=(ClientDatagramSocket &&other) noexcept = default; - - public: - void recvOneTruncating(std::size_t size, char *data) override { - this->_start_usage(); - this->_value.recv_stream(size, data); - this->_finish_usage(); - } - - void sendOne(std::size_t size, char const *data) override { - this->_start_usage(); - this->_value.send_stream(size, data); - this->_finish_usage(); - } - - void close() override { - this->_close(); - } - - ~ClientDatagramSocket() noexcept override = default; - }; - - - template - class _socket_internals > { - public: - static ClientDatagramSocket wrap(ctx_t::socket_t &&raw) { - return ClientDatagramSocket{std::move(raw)}; - } - }; - - export - template requires BerkeleySocketsContext - [[nodiscard]] - ClientDatagramSocket connect_udp(Address addr) { - if (!_addr_internals::has_value(addr)) - LdH::abort("Address not initialized"); - typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals::unwrap(addr), ctx_t::sock_type::dgram(), ctx_t::proto::udp()); - sock.connect(_addr_internals::unwrap(addr)); - return _socket_internals >::wrap(std::move(sock)); - } - - export - template requires BerkeleySocketsContext - [[nodiscard]] - ServerDatagramSocket listen_udp(Address addr) { - if (!_addr_internals::has_value(addr)) - LdH::abort("Address not initialized"); - typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals::unwrap(addr), ctx_t::sock_type::dgram(), ctx_t::proto::udp()); - sock.bind(_addr_internals::unwrap(addr)); - return _socket_internals >::wrap(std::move(sock)); - } -#pragma endregion -} diff --git a/modules/sockets/src/platform/windows.cpp.inc b/modules/sockets/src/platform/windows.cpp.inc index 8f71697..b844ce4 100644 --- a/modules/sockets/src/platform/windows.cpp.inc +++ b/modules/sockets/src/platform/windows.cpp.inc @@ -2,7 +2,7 @@ import std; import ru.landgrafhomyak.BGTU.networks_1.exceptions; import ru.landgrafhomyak.BGTU.networks_1.exceptions.windows; import ru.landgrafhomyak.BGTU.networks_1.streams; -import :berkeley_sockets; +import :berkeley; namespace LdH::Sockets { export