Splitted berkeley sockets abstraction to several files

This commit is contained in:
Andrew Golovashevich 2025-11-04 16:49:13 +03:00
parent 8c98c336e6
commit 11405ebff5
10 changed files with 672 additions and 620 deletions

View File

@ -6,8 +6,15 @@ target_sources(
PUBLIC PUBLIC
FILE_SET cxx_modules TYPE CXX_MODULES FILES FILE_SET cxx_modules TYPE CXX_MODULES FILES
src/common.cppm src/common.cppm
src/berkeley_sockets.cppm
src/windows_binds.cppm src/windows_binds.cppm
src/berkeley/context.cppm
src/berkeley/internals.cppm
src/berkeley/address.cppm
src/berkeley/stream.cppm
src/berkeley/datagram.cppm
src/berkeley/entry_points.cppm
src/berkeley/main.cppm
) )
target_link_libraries(sockets PRIVATE wsock32 ws2_32 exceptions PUBLIC streams) target_link_libraries(sockets PRIVATE wsock32 ws2_32 exceptions PUBLIC streams)

View File

@ -0,0 +1,61 @@
export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.address;
import std;
import :berkeley.context;
import :berkeley.internals;
namespace LdH::Sockets::Berkeley {
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class Address {
private:
bool _has_value;
ctx_t::address_t _value;
explicit Address(bool has_value, ctx_t::address_t value) : _has_value{has_value}, _value{value} {
}
template<class ctx_t2> requires BerkeleySocketsContext<ctx_t2>
friend class _addr_internals;
public:
Address() : _has_value{false}, _value{} {
}
Address(Address const &other) noexcept = default;
Address(Address &&other) noexcept : _has_value{other._has_value}, _value{other._value} {
other._has_value = false;
}
Address &operator=(Address const &other) = default;
Address &operator=(Address &&other) noexcept {
this->_has_value = other._has_value;
this->_value = std::move(other._value);
other._has_value = false;
return *this;
}
~Address() {
this->_has_value = false;
}
static Address parse(char const *host, char const *service) {
return Address{true, ctx_t::address_t::parse(host, service)};
}
std::string to_string() {
return this->_value.to_string();
}
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _addr_internals {
public:
static bool has_value(Address<ctx_t> const &v) { return v._has_value; }
static ctx_t::address_t unwrap(Address<ctx_t> const &v) { return v._value; }
static Address<ctx_t> wrap(ctx_t::address_t const &raw) { return Address<ctx_t>{true, raw}; }
};
}

View File

@ -0,0 +1,33 @@
export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.context;
import std;
namespace LdH::Sockets::Berkeley {
export
template<class ctx_t>
concept BerkeleySocketsContext = requires { typename ctx_t::address_t; } &&
std::default_initializable<typename ctx_t::address_t> && std::copyable<typename ctx_t::address_t> && std::destructible<typename ctx_t::address_t> &&
requires(char const *h, char const *s) { { ctx_t::address_t::parse(h, s) } -> std::same_as<typename ctx_t::address_t>; } &&
requires(typename ctx_t::address_t s) { { s.to_string() } -> std::same_as<std::string>; } &&
requires { typename ctx_t::sock_type_t; } &&
requires { typename ctx_t::sock_type; } &&
requires { { ctx_t::sock_type::stream() } -> std::same_as<typename ctx_t::sock_type_t>; } &&
requires { { ctx_t::sock_type::dgram() } -> std::same_as<typename ctx_t::sock_type_t>; } &&
requires { typename ctx_t::proto_t; } &&
requires { typename ctx_t::proto; } &&
requires { { ctx_t::proto::tcp() } -> std::same_as<typename ctx_t::proto_t>; } &&
requires { { ctx_t::proto::udp() } -> std::same_as<typename ctx_t::proto_t>; } &&
requires { { ctx_t::proto::icmp() } -> std::same_as<typename ctx_t::proto_t>; } &&
requires { typename ctx_t::socket_t; } &&
std::default_initializable<typename ctx_t::socket_t> && std::movable<typename ctx_t::socket_t> && std::destructible<typename ctx_t::socket_t> &&
requires(typename ctx_t::address_t a, typename ctx_t::sock_type_t t, typename ctx_t::proto_t p) { { ctx_t::socket_t::create(a, t, p) } -> std::same_as<typename ctx_t::socket_t>; } &&
requires(typename ctx_t::socket_t s, std::size_t q) { { s.listen(q) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, typename ctx_t::address_t e) { { s.connect(e) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, typename ctx_t::address_t e) { { s.bind(e) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s) { { s.close() } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, std::size_t c, char const *d) { { s.send_stream(c, d) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, std::size_t c, char *d) { { s.recv_stream(c, d) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, std::size_t c, char *d) { { s.recv_datagram(c, d) } -> std::same_as<typename ctx_t::address_t>; } &&
requires(typename ctx_t::socket_t s, typename ctx_t::address_t e, std::size_t c, char const *d) { { s.send_datagram(e, c, d) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, typename ctx_t::address_t *a) { { s.accept(a) } -> std::same_as<typename ctx_t::socket_t>; };
}

View File

@ -0,0 +1,224 @@
export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.stream;
import std;
import ru.landgrafhomyak.BGTU.networks_1.exceptions;
import ru.landgrafhomyak.BGTU.networks_1.streams;
import :berkeley.context;
import :berkeley.internals;
import :berkeley.address;
namespace LdH::Sockets::Berkeley {
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _abstract_datagram_socket {
private:
using _refcnt_t = signed long long;
std::atomic<_refcnt_t> _refcnt;
protected:
ctx_t::socket_t _value;
private:
static constexpr _refcnt_t _refcnt_UNINITIALIZED = -0x0800'0000'0000'0000ll;
static constexpr _refcnt_t _refcnt_MOVED = -0x0800'0000'0000'0001ll;
static constexpr _refcnt_t _refcnt_CLOSED = -0x07FFF'FFFF'FFFF'FFFFll;
protected:
explicit _abstract_datagram_socket(ctx_t::socket_t &&value) : _refcnt{0}, _value{std::move(value)} {
}
public:
_abstract_datagram_socket() : _refcnt{_refcnt_UNINITIALIZED}, _value{} {
};
_abstract_datagram_socket(_abstract_datagram_socket &&other) noexcept : _refcnt{other._refcnt.load()}, _value{std::move(other._value)} {
};
_abstract_datagram_socket &operator=(_abstract_datagram_socket &&other) noexcept {
_refcnt_t current = other->_refcnt.load();
while (true) {
if (current > 0)
LdH::abort("Can't move socket while it is in use");
if (this->_refcnt.compare_exchange_weak(current, _refcnt_MOVED))
break;
}
current = this->_refcnt.load();
while (true) {
if (current >= 0) {
other._refcnt.store(0); // rollback
LdH::abort("Variable already initialized");
}
if (this->_refcnt.compare_exchange_weak(current, 0))
break;
}
this->_value = std::move(other._value);
return *this;
}
protected:
void _start_usage() {
_refcnt_t current = this->_refcnt.load();
while (true) {
if (current < 0) {
switch (current) {
case _refcnt_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _refcnt_MOVED:
LdH::abort("Socket was moved to another location");
case _refcnt_CLOSED:
LdH::abort("Socket was closed");
default:
LdH::abort("Socket wrapper corrupted");
}
}
if (this->_refcnt.compare_exchange_weak(current, current + 1))
break;
}
}
void _finish_usage() {
_refcnt_t current = this->_refcnt.load();
while (true) {
if (current < 0) {
switch (current) {
case _refcnt_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _refcnt_MOVED:
LdH::abort("Socket was moved to another location");
case _refcnt_CLOSED:
LdH::abort("Socket was closed");
default:
LdH::abort("Socket wrapper corrupted");
}
}
if (this->_refcnt.compare_exchange_weak(current, current - 1))
break;
}
}
protected:
void _close() {
_refcnt_t current = this->_refcnt.load();
while (true) {
if (current != 0) {
switch (current) {
case _refcnt_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _refcnt_MOVED:
LdH::abort("Socket was moved to another location");
case _refcnt_CLOSED:
LdH::abort("Socket was already closed");
default:
if (current > 0)
LdH::abort("Can't close socket while it is in use");
else
LdH::abort("Socket wrapper corrupted");
}
}
if (this->_refcnt.compare_exchange_weak(current, _refcnt_CLOSED))
break;
}
}
~_abstract_datagram_socket() noexcept = default;
};
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class ServerDatagramSocket : public _abstract_datagram_socket<ctx_t> {
private:
explicit ServerDatagramSocket(ctx_t::socket_t &&value) : _abstract_datagram_socket<ctx_t>{std::move(value)} {
}
template<class>
friend
class _socket_internals;
public:
ServerDatagramSocket() = default;
ServerDatagramSocket(ServerDatagramSocket &&other) noexcept = default;
ServerDatagramSocket &operator=(ServerDatagramSocket &&other) noexcept = default;
public:
Address<ctx_t> recvOneTruncating(std::size_t size, char *data) {
this->_start_usage();
auto addr = this->_value.recv_datagram(size, data);
this->_finish_usage();
return _addr_internals<ctx_t>::wrap(std::move(addr));
}
void sendOne(Address<ctx_t> destination, std::size_t size, char const *data) {
this->_start_usage();
this->_value.send_datagram(destination, size, data);
this->_finish_usage();
}
void close() {
this->_close();
}
~ServerDatagramSocket() noexcept = default;
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _socket_internals<ServerDatagramSocket<ctx_t> > {
public:
static ServerDatagramSocket<ctx_t> wrap(ctx_t::socket_t &&raw) {
return ServerDatagramSocket<ctx_t>{std::move(raw)};
}
};
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class ClientDatagramSocket : public _abstract_datagram_socket<ctx_t>, public LdH::Streams::InputMessanger, public LdH::Streams::OutputMessanger {
private:
explicit ClientDatagramSocket(ctx_t::socket_t &&value) : _abstract_datagram_socket<ctx_t>{std::move(value)} {
}
template<class>
friend
class _socket_internals;
public:
ClientDatagramSocket() = default;
ClientDatagramSocket(ClientDatagramSocket &&other) noexcept = default;
ClientDatagramSocket &operator=(ClientDatagramSocket &&other) noexcept = default;
public:
void recvOneTruncating(std::size_t size, char *data) override {
this->_start_usage();
this->_value.recv_stream(size, data);
this->_finish_usage();
}
void sendOne(std::size_t size, char const *data) override {
this->_start_usage();
this->_value.send_stream(size, data);
this->_finish_usage();
}
void close() override {
this->_close();
}
~ClientDatagramSocket() noexcept override = default;
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _socket_internals<ClientDatagramSocket<ctx_t> > {
public:
static ClientDatagramSocket<ctx_t> wrap(ctx_t::socket_t &&raw) {
return ClientDatagramSocket<ctx_t>{std::move(raw)};
}
};
}

View File

@ -0,0 +1,59 @@
export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.entry_points;
import std;
import ru.landgrafhomyak.BGTU.networks_1.exceptions;
import :berkeley.context;
import :berkeley.internals;
import :berkeley.address;
import :berkeley.stream;
import :berkeley.datagram;
namespace LdH::Sockets::Berkeley {
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
[[nodiscard]]
StreamSocket<ctx_t> connect_tcp(Address<ctx_t> addr) {
if (!_addr_internals<ctx_t>::has_value(addr))
LdH::abort("Address not initialized");
typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals<ctx_t>::unwrap(addr), ctx_t::sock_type::stream(), ctx_t::proto::tcp());
sock.connect(_addr_internals<ctx_t>::unwrap(addr));
return _socket_internals<StreamSocket<ctx_t> >::wrap(std::move(sock));
}
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
[[nodiscard]]
StreamSocketsServer<ctx_t> listen_tcp(Address<ctx_t> addr, std::size_t queue_size) {
if (!_addr_internals<ctx_t>::has_value(addr))
LdH::abort("Address not initialized");
typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals<ctx_t>::unwrap(addr), ctx_t::sock_type::stream(), ctx_t::proto::tcp());
sock.bind(_addr_internals<ctx_t>::unwrap(addr));
sock.listen(queue_size);
return _socket_internals<StreamSocketsServer<ctx_t> >::wrap(std::move(sock));
}
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
[[nodiscard]]
ClientDatagramSocket<ctx_t> connect_udp(Address<ctx_t> addr) {
if (!_addr_internals<ctx_t>::has_value(addr))
LdH::abort("Address not initialized");
typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals<ctx_t>::unwrap(addr), ctx_t::sock_type::dgram(), ctx_t::proto::udp());
sock.connect(_addr_internals<ctx_t>::unwrap(addr));
return _socket_internals<ClientDatagramSocket<ctx_t> >::wrap(std::move(sock));
}
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
[[nodiscard]]
ServerDatagramSocket<ctx_t> listen_udp(Address<ctx_t> addr) {
if (!_addr_internals<ctx_t>::has_value(addr))
LdH::abort("Address not initialized");
typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals<ctx_t>::unwrap(addr), ctx_t::sock_type::dgram(), ctx_t::proto::udp());
sock.bind(_addr_internals<ctx_t>::unwrap(addr));
return _socket_internals<ServerDatagramSocket<ctx_t> >::wrap(std::move(sock));
}
}

View File

@ -0,0 +1,16 @@
export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.internals;
import std;
import :berkeley.context;
namespace LdH::Sockets::Berkeley {
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _addr_internals;
export
template<class>
class _socket_internals {
};
}

View File

@ -0,0 +1,8 @@
export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley;
export import :berkeley.context;
export import :berkeley.address;
export import :berkeley.stream;
export import :berkeley.datagram;
export import :berkeley.entry_points;

View File

@ -0,0 +1,262 @@
export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley.datagram;
import std;
import ru.landgrafhomyak.BGTU.networks_1.exceptions;
import ru.landgrafhomyak.BGTU.networks_1.streams;
import :berkeley.context;
import :berkeley.internals;
import :berkeley.address;
namespace LdH::Sockets::Berkeley {
enum _socket_state_t {
_socket_state_UNINITIALIZED,
_socket_state_MOVED,
_socket_state_AVAILABLE,
_socket_state_READING,
_socket_state_WRITING,
_socket_state_BOTH,
_socket_state_CLOSED,
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _berkeley_socket_commons {
private:
std::atomic<_socket_state_t> _state;
protected:
ctx_t::socket_t _value;
explicit _berkeley_socket_commons(ctx_t::socket_t &&value) : _state{_socket_state_AVAILABLE}, _value{std::move(value)} {
}
public:
_berkeley_socket_commons() : _state{_socket_state_UNINITIALIZED}, _value{} {
};
explicit _berkeley_socket_commons(_berkeley_socket_commons &&other) noexcept : _state{other._state.load()}, _value{std::move(other._value)} {
other._state.store(_socket_state_MOVED);
}
_berkeley_socket_commons &operator=(_berkeley_socket_commons &&other) noexcept {
_socket_state_t current = this->_state.load();
while (true) {
switch (current) {
case _socket_state_UNINITIALIZED:
case _socket_state_MOVED:
case _socket_state_CLOSED:
if (this->_state.compare_exchange_weak(current, _socket_state_AVAILABLE))
goto DONE;
continue;
default:
LdH::abort("Variable already initialized");
}
}
DONE:
this->_state.store(other._state.exchange(_socket_state_MOVED));
this->_value = std::move(other._value);
return *this;
}
private:
_socket_state_t _cas_state(_socket_state_t expected, _socket_state_t next) {
this->_state.compare_exchange_strong(expected, next);
return expected;
}
template<_socket_state_t exp1, _socket_state_t next1, _socket_state_t exp2, _socket_state_t next2>
_socket_state_t _cas_state_2() {
_socket_state_t current = this->_state.load();
while (true) {
switch (current) {
case exp1:
if (this->_state.compare_exchange_weak(current, next1))
return current;
continue;
case exp2:
if (this->_state.compare_exchange_weak(current, next2))
return current;
continue;
default:
return current;
}
}
}
protected:
void _start_reading(char const *busy_msg) {
switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_READING, _socket_state_WRITING, _socket_state_BOTH>()) {
case _socket_state_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _socket_state_MOVED:
LdH::abort("Socket was moved to another location");
case _socket_state_AVAILABLE:
case _socket_state_WRITING:
break;
case _socket_state_READING:
case _socket_state_BOTH:
LdH::abort(busy_msg);
case _socket_state_CLOSED:
LdH::abort("Socket was closed");
}
}
void _finish_reading() {
this->_cas_state_2<_socket_state_READING, _socket_state_AVAILABLE, _socket_state_BOTH, _socket_state_WRITING>();
}
void _start_writing(char const *busy_msg) {
switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_WRITING, _socket_state_READING, _socket_state_BOTH>()) {
case _socket_state_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _socket_state_MOVED:
LdH::abort("Socket was moved to another location");
case _socket_state_AVAILABLE:
case _socket_state_READING:
break;
case _socket_state_WRITING:
case _socket_state_BOTH:
LdH::abort(busy_msg);
case _socket_state_CLOSED:
LdH::abort("Socket was closed");
}
}
void _finish_writing() {
this->_cas_state_2<_socket_state_WRITING, _socket_state_AVAILABLE, _socket_state_BOTH, _socket_state_READING>();
}
void _close(char const *busy_msg) {
switch (this->_cas_state(_socket_state_AVAILABLE, _socket_state_CLOSED)) {
case _socket_state_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _socket_state_MOVED:
LdH::abort("Socket was moved to another location");
case _socket_state_AVAILABLE:
break;
case _socket_state_READING:
case _socket_state_WRITING:
case _socket_state_BOTH:
LdH::abort(busy_msg);
case _socket_state_CLOSED:
LdH::abort("Socket already was closed");
}
this->_value.close();
}
};
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class StreamSocketsServer;
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class StreamSocket : public _berkeley_socket_commons<ctx_t>, public virtual LdH::Streams::InputStream, public virtual LdH::Streams::OutputStream {
private:
explicit StreamSocket(ctx_t::socket_t &&value) : _berkeley_socket_commons<ctx_t>{std::move(value)} {
}
template<class ctx_t2> requires BerkeleySocketsContext<ctx_t2>
friend
class StreamSocketsServer;
template<class>
friend
class _socket_internals;
public:
StreamSocket() = default;
StreamSocket(StreamSocket &&other) noexcept = default;
StreamSocket &operator=(StreamSocket &&other) noexcept = default;
void read(std::size_t size, char *data) override {
this->_start_reading("Socket already read by another thread");
this->_value.recv_stream(size, data);
this->_finish_reading();
}
void write(std::size_t size, char const *data) override {
this->_start_writing("Socket already written by another thread");
this->_value.send_stream(size, data);
this->_finish_writing();
}
void close() override {
this->_close("Socket is busy with reading or writing");
}
~StreamSocket() noexcept override = default;
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _socket_internals<StreamSocket<ctx_t> > {
public:
static StreamSocket<ctx_t> wrap(ctx_t::socket_t &&raw) {
return StreamSocket<ctx_t>{std::move(raw)};
}
};
export
template<class ctx_t, class sock_t>
class SocketWithAddress {
public:
Address<ctx_t> addr;
sock_t sock;
SocketWithAddress(Address<ctx_t> addr, sock_t &&sock) : addr{addr}, sock{std::move(sock)} {
}
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class StreamSocketsServer : public _berkeley_socket_commons<ctx_t> {
private:
explicit StreamSocketsServer(ctx_t::socket_t &&value) : _berkeley_socket_commons<ctx_t>{std::move(value)} {
}
template<class>
friend
class _socket_internals;
public:
StreamSocketsServer() = default;
StreamSocketsServer(StreamSocketsServer &&other) noexcept = default;
StreamSocketsServer &operator=(StreamSocketsServer &&other) noexcept = default;
[[nodiscard]]
SocketWithAddress<ctx_t, StreamSocket<ctx_t> > wait_for_connection() {
this->_start_reading("Socket already waiting for connections");
typename ctx_t::address_t addr{};
typename ctx_t::socket_t raw = this->_value.accept(&addr);
this->_finish_reading();
return SocketWithAddress<ctx_t, StreamSocket<ctx_t> >{_addr_internals<ctx_t>::wrap(addr), _socket_internals<StreamSocket<ctx_t> >::wrap(std::move(raw))};
}
void close() {
this->_close("Socket busy with waiting for connection");
}
~StreamSocketsServer() noexcept = default;
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _socket_internals<StreamSocketsServer<ctx_t> > {
public:
static StreamSocketsServer<ctx_t> wrap(ctx_t::socket_t &&raw) {
return StreamSocketsServer<ctx_t>{std::move(raw)};
}
};
}

View File

@ -1,618 +0,0 @@
export module ru.landgrafhomyak.BGTU.networks_1.sockets:berkeley_sockets;
import std;
import ru.landgrafhomyak.BGTU.networks_1.exceptions;
import ru.landgrafhomyak.BGTU.networks_1.streams;
namespace LdH::Sockets::Berkeley {
#pragma region context concept
export
template<class ctx_t>
concept BerkeleySocketsContext = requires { typename ctx_t::address_t; } &&
std::default_initializable<typename ctx_t::address_t> && std::copyable<typename ctx_t::address_t> && std::destructible<typename ctx_t::address_t> &&
requires(char const *h, char const *s) { { ctx_t::address_t::parse(h, s) } -> std::same_as<typename ctx_t::address_t>; } &&
requires(typename ctx_t::address_t s) { { s.to_string() } -> std::same_as<std::string>; } &&
requires { typename ctx_t::sock_type_t; } &&
requires { typename ctx_t::sock_type; } &&
requires { { ctx_t::sock_type::stream() } -> std::same_as<typename ctx_t::sock_type_t>; } &&
requires { { ctx_t::sock_type::dgram() } -> std::same_as<typename ctx_t::sock_type_t>; } &&
requires { typename ctx_t::proto_t; } &&
requires { typename ctx_t::proto; } &&
requires { { ctx_t::proto::tcp() } -> std::same_as<typename ctx_t::proto_t>; } &&
requires { { ctx_t::proto::udp() } -> std::same_as<typename ctx_t::proto_t>; } &&
requires { { ctx_t::proto::icmp() } -> std::same_as<typename ctx_t::proto_t>; } &&
requires { typename ctx_t::socket_t; } &&
std::default_initializable<typename ctx_t::socket_t> && std::movable<typename ctx_t::socket_t> && std::destructible<typename ctx_t::socket_t> &&
requires(typename ctx_t::address_t a, typename ctx_t::sock_type_t t, typename ctx_t::proto_t p) { { ctx_t::socket_t::create(a, t, p) } -> std::same_as<typename ctx_t::socket_t>; } &&
requires(typename ctx_t::socket_t s, std::size_t q) { { s.listen(q) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, typename ctx_t::address_t e) { { s.connect(e) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, typename ctx_t::address_t e) { { s.bind(e) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s) { { s.close() } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, std::size_t c, char const *d) { { s.send_stream(c, d) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, std::size_t c, char *d) { { s.recv_stream(c, d) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, std::size_t c, char *d) { { s.recv_datagram(c, d) } -> std::same_as<typename ctx_t::address_t>; } &&
requires(typename ctx_t::socket_t s, typename ctx_t::address_t e, std::size_t c, char const *d) { { s.send_datagram(e, c, d) } -> std::same_as<void>; } &&
requires(typename ctx_t::socket_t s, typename ctx_t::address_t *a) { { s.accept(a) } -> std::same_as<typename ctx_t::socket_t>; };
#pragma endregion
#pragma region address
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _addr_internals;
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class Address {
private:
bool _has_value;
ctx_t::address_t _value;
explicit Address(bool has_value, ctx_t::address_t value) : _has_value{has_value}, _value{value} {
}
template<class ctx_t2> requires BerkeleySocketsContext<ctx_t2>
friend class _addr_internals;
public:
Address() : _has_value{false}, _value{} {
}
Address(Address const &other) noexcept = default;
Address(Address &&other) noexcept : _has_value{other._has_value}, _value{other._value} {
other._has_value = false;
}
Address &operator=(Address const &other) = default;
Address &operator=(Address &&other) noexcept {
this->_has_value = other._has_value;
this->_value = std::move(other._value);
other._has_value = false;
return *this;
}
~Address() {
this->_has_value = false;
}
static Address parse(char const *host, char const *service) {
return Address{true, ctx_t::address_t::parse(host, service)};
}
std::string to_string() {
return this->_value.to_string();
}
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _addr_internals {
public:
static bool has_value(Address<ctx_t> const &v) { return v._has_value; }
static ctx_t::address_t unwrap(Address<ctx_t> const &v) { return v._value; }
static Address<ctx_t> wrap(ctx_t::address_t const &raw) { return Address<ctx_t>{true, raw}; }
};
#pragma endregion
#pragma region abstract socket
enum _socket_state_t {
_socket_state_UNINITIALIZED,
_socket_state_MOVED,
_socket_state_AVAILABLE,
_socket_state_READING,
_socket_state_WRITING,
_socket_state_BOTH,
_socket_state_CLOSED,
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _berkeley_socket_commons {
private:
std::atomic<_socket_state_t> _state;
protected:
ctx_t::socket_t _value;
explicit _berkeley_socket_commons(ctx_t::socket_t &&value) : _state{_socket_state_AVAILABLE}, _value{std::move(value)} {
}
public:
_berkeley_socket_commons() : _state{_socket_state_UNINITIALIZED}, _value{} {
};
explicit _berkeley_socket_commons(_berkeley_socket_commons &&other) noexcept : _state{other._state.load()}, _value{std::move(other._value)} {
other._state.store(_socket_state_MOVED);
}
_berkeley_socket_commons &operator=(_berkeley_socket_commons &&other) noexcept {
_socket_state_t current = this->_state.load();
while (true) {
switch (current) {
case _socket_state_UNINITIALIZED:
case _socket_state_MOVED:
case _socket_state_CLOSED:
if (this->_state.compare_exchange_weak(current, _socket_state_AVAILABLE))
goto DONE;
continue;
default:
LdH::abort("Variable already initialized");
}
}
DONE:
this->_state.store(other._state.exchange(_socket_state_MOVED));
this->_value = std::move(other._value);
return *this;
}
private:
_socket_state_t _cas_state(_socket_state_t expected, _socket_state_t next) {
this->_state.compare_exchange_strong(expected, next);
return expected;
}
template<_socket_state_t exp1, _socket_state_t next1, _socket_state_t exp2, _socket_state_t next2>
_socket_state_t _cas_state_2() {
_socket_state_t current = this->_state.load();
while (true) {
switch (current) {
case exp1:
if (this->_state.compare_exchange_weak(current, next1))
return current;
continue;
case exp2:
if (this->_state.compare_exchange_weak(current, next2))
return current;
continue;
default:
return current;
}
}
}
protected:
void _start_reading(char const *busy_msg) {
switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_READING, _socket_state_WRITING, _socket_state_BOTH>()) {
case _socket_state_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _socket_state_MOVED:
LdH::abort("Socket was moved to another location");
case _socket_state_AVAILABLE:
case _socket_state_WRITING:
break;
case _socket_state_READING:
case _socket_state_BOTH:
LdH::abort(busy_msg);
case _socket_state_CLOSED:
LdH::abort("Socket was closed");
}
}
void _finish_reading() {
this->_cas_state_2<_socket_state_READING, _socket_state_AVAILABLE, _socket_state_BOTH, _socket_state_WRITING>();
}
void _start_writing(char const *busy_msg) {
switch (this->_cas_state_2<_socket_state_AVAILABLE, _socket_state_WRITING, _socket_state_READING, _socket_state_BOTH>()) {
case _socket_state_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _socket_state_MOVED:
LdH::abort("Socket was moved to another location");
case _socket_state_AVAILABLE:
case _socket_state_READING:
break;
case _socket_state_WRITING:
case _socket_state_BOTH:
LdH::abort(busy_msg);
case _socket_state_CLOSED:
LdH::abort("Socket was closed");
}
}
void _finish_writing() {
this->_cas_state_2<_socket_state_WRITING, _socket_state_AVAILABLE, _socket_state_BOTH, _socket_state_READING>();
}
void _close(char const *busy_msg) {
switch (this->_cas_state(_socket_state_AVAILABLE, _socket_state_CLOSED)) {
case _socket_state_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _socket_state_MOVED:
LdH::abort("Socket was moved to another location");
case _socket_state_AVAILABLE:
break;
case _socket_state_READING:
case _socket_state_WRITING:
case _socket_state_BOTH:
LdH::abort(busy_msg);
case _socket_state_CLOSED:
LdH::abort("Socket already was closed");
}
this->_value.close();
}
};
template<class>
class _socket_internals {
};
#pragma endregion
#pragma region stream sockets
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class StreamSocketsServer;
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class StreamSocket : public _berkeley_socket_commons<ctx_t>, public virtual LdH::Streams::InputStream, public virtual LdH::Streams::OutputStream {
private:
explicit StreamSocket(ctx_t::socket_t &&value) : _berkeley_socket_commons<ctx_t>{std::move(value)} {
}
template<class ctx_t2> requires BerkeleySocketsContext<ctx_t2>
friend
class StreamSocketsServer;
template<class>
friend
class _socket_internals;
public:
StreamSocket() = default;
StreamSocket(StreamSocket &&other) noexcept = default;
StreamSocket &operator=(StreamSocket &&other) noexcept = default;
void read(std::size_t size, char *data) override {
this->_start_reading("Socket already read by another thread");
this->_value.recv_stream(size, data);
this->_finish_reading();
}
void write(std::size_t size, char const *data) override {
this->_start_writing("Socket already written by another thread");
this->_value.send_stream(size, data);
this->_finish_writing();
}
void close() override {
this->_close("Socket is busy with reading or writing");
}
~StreamSocket() noexcept override = default;
};
template<class ctx_t>
class _socket_internals<StreamSocket<ctx_t> > {
public:
static StreamSocket<ctx_t> wrap(ctx_t::socket_t &&raw) {
return StreamSocket<ctx_t>{std::move(raw)};
}
};
export
template<class ctx_t, class sock_t>
class SocketWithAddress {
public:
Address<ctx_t> addr;
sock_t sock;
SocketWithAddress(Address<ctx_t> addr, sock_t &&sock) : addr{addr}, sock{std::move(sock)} {
}
};
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class StreamSocketsServer : public _berkeley_socket_commons<ctx_t> {
private:
explicit StreamSocketsServer(ctx_t::socket_t &&value) : _berkeley_socket_commons<ctx_t>{std::move(value)} {
}
template<class>
friend
class _socket_internals;
public:
StreamSocketsServer() = default;
StreamSocketsServer(StreamSocketsServer &&other) noexcept = default;
StreamSocketsServer &operator=(StreamSocketsServer &&other) noexcept = default;
[[nodiscard]]
SocketWithAddress<ctx_t, StreamSocket<ctx_t> > wait_for_connection() {
this->_start_reading("Socket already waiting for connections");
typename ctx_t::address_t addr{};
typename ctx_t::socket_t raw = this->_value.accept(&addr);
this->_finish_reading();
return SocketWithAddress<ctx_t, StreamSocket<ctx_t> >{_addr_internals<ctx_t>::wrap(addr), _socket_internals<StreamSocket<ctx_t> >::wrap(std::move(raw))};
}
void close() {
this->_close("Socket busy with waiting for connection");
}
~StreamSocketsServer() noexcept = default;
};
template<class ctx_t>
class _socket_internals<StreamSocketsServer<ctx_t> > {
public:
static StreamSocketsServer<ctx_t> wrap(ctx_t::socket_t &&raw) {
return StreamSocketsServer<ctx_t>{std::move(raw)};
}
};
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
[[nodiscard]]
StreamSocket<ctx_t> connect_tcp(Address<ctx_t> addr) {
if (!_addr_internals<ctx_t>::has_value(addr))
LdH::abort("Address not initialized");
typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals<ctx_t>::unwrap(addr), ctx_t::sock_type::stream(), ctx_t::proto::tcp());
sock.connect(_addr_internals<ctx_t>::unwrap(addr));
return _socket_internals<StreamSocket<ctx_t> >::wrap(std::move(sock));
}
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
[[nodiscard]]
StreamSocketsServer<ctx_t> listen_tcp(Address<ctx_t> addr, std::size_t queue_size) {
if (!_addr_internals<ctx_t>::has_value(addr))
LdH::abort("Address not initialized");
typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals<ctx_t>::unwrap(addr), ctx_t::sock_type::stream(), ctx_t::proto::tcp());
sock.bind(_addr_internals<ctx_t>::unwrap(addr));
sock.listen(queue_size);
return _socket_internals<StreamSocketsServer<ctx_t> >::wrap(std::move(sock));
}
#pragma endregion
#pragma region datagram sockets
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class _abstract_datagram_socket {
private:
using _refcnt_t = signed long long;
std::atomic<_refcnt_t> _refcnt;
protected:
ctx_t::socket_t _value;
private:
static constexpr _refcnt_t _refcnt_UNINITIALIZED = -0x0800'0000'0000'0000ll;
static constexpr _refcnt_t _refcnt_MOVED = -0x0800'0000'0000'0001ll;
static constexpr _refcnt_t _refcnt_CLOSED = -0x07FFF'FFFF'FFFF'FFFFll;
protected:
explicit _abstract_datagram_socket(ctx_t::socket_t &&value) : _refcnt{0}, _value{std::move(value)} {
}
public:
_abstract_datagram_socket() : _refcnt{_refcnt_UNINITIALIZED}, _value{} {
};
_abstract_datagram_socket(_abstract_datagram_socket &&other) noexcept : _refcnt{other._refcnt.load()}, _value{std::move(other._value)} {
};
_abstract_datagram_socket &operator=(_abstract_datagram_socket &&other) noexcept {
_refcnt_t current = other->_refcnt.load();
while (true) {
if (current > 0)
LdH::abort("Can't move socket while it is in use");
if (this->_refcnt.compare_exchange_weak(current, _refcnt_MOVED))
break;
}
current = this->_refcnt.load();
while (true) {
if (current >= 0) {
other._refcnt.store(0); // rollback
LdH::abort("Variable already initialized");
}
if (this->_refcnt.compare_exchange_weak(current, 0))
break;
}
this->_value = std::move(other._value);
return *this;
}
protected:
void _start_usage() {
_refcnt_t current = this->_refcnt.load();
while (true) {
if (current < 0) {
switch (current) {
case _refcnt_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _refcnt_MOVED:
LdH::abort("Socket was moved to another location");
case _refcnt_CLOSED:
LdH::abort("Socket was closed");
default:
LdH::abort("Socket wrapper corrupted");
}
}
if (this->_refcnt.compare_exchange_weak(current, current + 1))
break;
}
}
void _finish_usage() {
_refcnt_t current = this->_refcnt.load();
while (true) {
if (current < 0) {
switch (current) {
case _refcnt_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _refcnt_MOVED:
LdH::abort("Socket was moved to another location");
case _refcnt_CLOSED:
LdH::abort("Socket was closed");
default:
LdH::abort("Socket wrapper corrupted");
}
}
if (this->_refcnt.compare_exchange_weak(current, current - 1))
break;
}
}
protected:
void _close() {
_refcnt_t current = this->_refcnt.load();
while (true) {
if (current != 0) {
switch (current) {
case _refcnt_UNINITIALIZED:
LdH::abort("Socket not initialized");
case _refcnt_MOVED:
LdH::abort("Socket was moved to another location");
case _refcnt_CLOSED:
LdH::abort("Socket was already closed");
default:
if (current > 0)
LdH::abort("Can't close socket while it is in use");
else
LdH::abort("Socket wrapper corrupted");
}
}
if (this->_refcnt.compare_exchange_weak(current, _refcnt_CLOSED))
break;
}
}
~_abstract_datagram_socket() noexcept = default;
};
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class ServerDatagramSocket : public _abstract_datagram_socket<ctx_t> {
private:
explicit ServerDatagramSocket(ctx_t::socket_t &&value) : _abstract_datagram_socket<ctx_t>{std::move(value)} {
}
template<class>
friend
class _socket_internals;
public:
ServerDatagramSocket() = default;
ServerDatagramSocket(ServerDatagramSocket &&other) noexcept = default;
ServerDatagramSocket &operator=(ServerDatagramSocket &&other) noexcept = default;
public:
Address<ctx_t> recvOneTruncating(std::size_t size, char *data) {
this->_start_usage();
auto addr = this->_value.recv_datagram(size, data);
this->_finish_usage();
return _addr_internals<ctx_t>::wrap(std::move(addr));
}
void sendOne(Address<ctx_t> destination, std::size_t size, char const *data) {
this->_start_usage();
this->_value.send_datagram(destination, size, data);
this->_finish_usage();
}
void close() {
this->_close();
}
~ServerDatagramSocket() noexcept = default;
};
template<class ctx_t>
class _socket_internals<ServerDatagramSocket<ctx_t> > {
public:
static ServerDatagramSocket<ctx_t> wrap(ctx_t::socket_t &&raw) {
return ServerDatagramSocket<ctx_t>{std::move(raw)};
}
};
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
class ClientDatagramSocket : public _abstract_datagram_socket<ctx_t>, public LdH::Streams::InputMessanger, public LdH::Streams::OutputMessanger {
private:
explicit ClientDatagramSocket(ctx_t::socket_t &&value) : _abstract_datagram_socket<ctx_t>{std::move(value)} {
}
template<class>
friend
class _socket_internals;
public:
ClientDatagramSocket() = default;
ClientDatagramSocket(ClientDatagramSocket &&other) noexcept = default;
ClientDatagramSocket &operator=(ClientDatagramSocket &&other) noexcept = default;
public:
void recvOneTruncating(std::size_t size, char *data) override {
this->_start_usage();
this->_value.recv_stream(size, data);
this->_finish_usage();
}
void sendOne(std::size_t size, char const *data) override {
this->_start_usage();
this->_value.send_stream(size, data);
this->_finish_usage();
}
void close() override {
this->_close();
}
~ClientDatagramSocket() noexcept override = default;
};
template<class ctx_t>
class _socket_internals<ClientDatagramSocket<ctx_t> > {
public:
static ClientDatagramSocket<ctx_t> wrap(ctx_t::socket_t &&raw) {
return ClientDatagramSocket<ctx_t>{std::move(raw)};
}
};
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
[[nodiscard]]
ClientDatagramSocket<ctx_t> connect_udp(Address<ctx_t> addr) {
if (!_addr_internals<ctx_t>::has_value(addr))
LdH::abort("Address not initialized");
typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals<ctx_t>::unwrap(addr), ctx_t::sock_type::dgram(), ctx_t::proto::udp());
sock.connect(_addr_internals<ctx_t>::unwrap(addr));
return _socket_internals<ClientDatagramSocket<ctx_t> >::wrap(std::move(sock));
}
export
template<class ctx_t> requires BerkeleySocketsContext<ctx_t>
[[nodiscard]]
ServerDatagramSocket<ctx_t> listen_udp(Address<ctx_t> addr) {
if (!_addr_internals<ctx_t>::has_value(addr))
LdH::abort("Address not initialized");
typename ctx_t::socket_t sock = ctx_t::socket_t::create(_addr_internals<ctx_t>::unwrap(addr), ctx_t::sock_type::dgram(), ctx_t::proto::udp());
sock.bind(_addr_internals<ctx_t>::unwrap(addr));
return _socket_internals<ServerDatagramSocket<ctx_t> >::wrap(std::move(sock));
}
#pragma endregion
}

View File

@ -2,7 +2,7 @@ import std;
import ru.landgrafhomyak.BGTU.networks_1.exceptions; import ru.landgrafhomyak.BGTU.networks_1.exceptions;
import ru.landgrafhomyak.BGTU.networks_1.exceptions.windows; import ru.landgrafhomyak.BGTU.networks_1.exceptions.windows;
import ru.landgrafhomyak.BGTU.networks_1.streams; import ru.landgrafhomyak.BGTU.networks_1.streams;
import :berkeley_sockets; import :berkeley;
namespace LdH::Sockets { namespace LdH::Sockets {
export export