Common code for windows and linux threads extracted to separate class

This commit is contained in:
Andrew Golovashevich 2025-10-27 09:08:04 +03:00
parent 5c02c56c5b
commit 600806d911
3 changed files with 306 additions and 182 deletions

View File

@ -7,6 +7,7 @@ target_sources(
FILE_SET cxx_modules TYPE CXX_MODULES FILES FILE_SET cxx_modules TYPE CXX_MODULES FILES
src/common.cppm src/common.cppm
src/thread_routine.cppm src/thread_routine.cppm
src/highlevel_impl.cppm
src/windows.cppm src/windows.cppm
) )

View File

@ -0,0 +1,228 @@
export module ru.landgrafhomyak.BGTU.networks_1.threads:highlevel_impl;
import std;
export import :thread_routine;
import ru.landgrafhomyak.BGTU.networks_1.exceptions;
import ru.landgrafhomyak.BGTU.networks_1.exceptions.windows;
import :thread_routine;
namespace LdH {
export
template<class native_wrapper_t>
concept NativeHighLevelThreadWrapper = std::movable<native_wrapper_t> &&
std::destructible<native_wrapper_t> &&
std::default_initializable<native_wrapper_t> &&
requires(native_wrapper_t &&self)
{
{ self.join() } -> std::same_as<void>;
{ self.destroy() } -> std::same_as<void>;
} &&
requires
{
typename native_wrapper_t::notifier_t;
requires std::default_initializable<typename native_wrapper_t::notifier_t>;
requires std::destructible<typename native_wrapper_t::notifier_t>;
} &&
requires(typename native_wrapper_t::notifier_t &self)
{
{ self.wait() } -> std::same_as<void>;
{ self.notify() } -> std::same_as<void>;
} &&
requires
{
typename native_wrapper_t::kernel_ret_t;
{ native_wrapper_t::kernel_ret() } -> std::same_as<typename native_wrapper_t::kernel_ret_t>;
} &&
requires(std::string &name, typename native_wrapper_t::kernel_ret_t (*kernel)(void *), void *thread_arg)
{
{ native_wrapper_t::start(name, kernel, thread_arg) } -> std::same_as<native_wrapper_t>;
};
export
template<class native_wrapper_t> requires NativeHighLevelThreadWrapper<native_wrapper_t>
class CommonHighLevelThreadWrapper;
export
template<class native_wrapper_t> requires NativeHighLevelThreadWrapper<native_wrapper_t>
CommonHighLevelThreadWrapper<native_wrapper_t> _common_highlevel_fork(std::string &&, ThreadRoutine auto &&);
export
template<class native_wrapper_t> requires NativeHighLevelThreadWrapper<native_wrapper_t>
class CommonHighLevelThreadWrapper {
private:
enum _thread_state {
_thread_state_UNINITIALIZED,
_thread_state_RUNNING,
_thread_state_JOINING,
_thread_state_JOINED,
_thread_state_DESTROYED,
_thread_state_MOVED
};
std::atomic<_thread_state> _state;
native_wrapper_t _native;
CommonHighLevelThreadWrapper(_thread_state state, native_wrapper_t &&native) : _state{state}, _native{std::move(native)} {
}
template<class native_wrapper_t2> requires NativeHighLevelThreadWrapper<native_wrapper_t2>
friend
CommonHighLevelThreadWrapper<native_wrapper_t2> _common_highlevel_fork(std::string &&, ThreadRoutine auto &&);
public:
CommonHighLevelThreadWrapper();
CommonHighLevelThreadWrapper(CommonHighLevelThreadWrapper &) = delete;
CommonHighLevelThreadWrapper(CommonHighLevelThreadWrapper const &) = delete;
CommonHighLevelThreadWrapper(CommonHighLevelThreadWrapper &&other) noexcept;
void operator=(CommonHighLevelThreadWrapper &&other) noexcept;
void join();
void destroy();
~CommonHighLevelThreadWrapper() noexcept(false);
private:
template<ThreadRoutine thread_routine_t>
struct _initializer {
public:
native_wrapper_t::notifier_t notifier;
std::string name;
thread_routine_t thread_routine;
};
template<ThreadRoutine thread_routine_t>
static native_wrapper_t::kernel_ret_t _kernel(_initializer<thread_routine_t> *init) {
std::string thread_name = std::move(init->name);
thread_routine_t thread_routine = std::move(init->thread_routine);
init->notifier.notify();
try {
thread_routine();
} catch (LdH::Exception const &e) {
std::cerr << "Uncaught exception in thread" << thread_name << ":\n";
e.printStackTrace();
} catch (std::exception const &e) {
std::cerr << "Uncaught exception in thread" << thread_name << ":\n" << e.what() << "\n";
}
return native_wrapper_t::kernel_ret();
};
[[nodiscard]] _thread_state _cas_state(_thread_state expected, _thread_state next) {
this->_state.compare_exchange_strong(expected, next);
return expected;
}
};
}
export
template<class native_wrapper_t> requires LdH::NativeHighLevelThreadWrapper<native_wrapper_t>
LdH::CommonHighLevelThreadWrapper<native_wrapper_t> LdH::_common_highlevel_fork(std::string &&name, LdH::ThreadRoutine auto &&routine) {
using thread_routine_t = std::remove_reference_t<decltype(routine)>;
typename LdH::CommonHighLevelThreadWrapper<native_wrapper_t>::template _initializer<thread_routine_t> initializer{
.notifier = typename native_wrapper_t::notifier_t{},
.name = std::move(name),
.thread_routine = std::move(routine)
};
native_wrapper_t native = native_wrapper_t::start(
initializer.name,
reinterpret_cast<native_wrapper_t::kernel_ret_t (*)(void *)>(&LdH::CommonHighLevelThreadWrapper<native_wrapper_t>::template _kernel<thread_routine_t>),
reinterpret_cast<void *>(&initializer)
);
initializer.notifier.wait();
return LdH::CommonHighLevelThreadWrapper<native_wrapper_t>{LdH::CommonHighLevelThreadWrapper<native_wrapper_t>::_thread_state_RUNNING, std::move(native)};
}
template<class native_wrapper_t> requires LdH::NativeHighLevelThreadWrapper<native_wrapper_t>
LdH::CommonHighLevelThreadWrapper<native_wrapper_t>::CommonHighLevelThreadWrapper() : _state{_thread_state_UNINITIALIZED}, _native{} {
}
template<class native_wrapper_t> requires LdH::NativeHighLevelThreadWrapper<native_wrapper_t>
LdH::CommonHighLevelThreadWrapper<native_wrapper_t>::CommonHighLevelThreadWrapper(
CommonHighLevelThreadWrapper &&other
) noexcept : _state{other._state.exchange(_thread_state_MOVED)}, _native{std::move(other._native)} {
}
template<class native_wrapper_t> requires LdH::NativeHighLevelThreadWrapper<native_wrapper_t>
void LdH::CommonHighLevelThreadWrapper<native_wrapper_t>::operator=(CommonHighLevelThreadWrapper &&other) noexcept {
switch (this->_state.load()) {
case _thread_state_UNINITIALIZED:
case _thread_state_MOVED:
case _thread_state_DESTROYED:
break;
case _thread_state_RUNNING:
case _thread_state_JOINING:
case _thread_state_JOINED:
throw LdH::Exception{"Variable already initialized"};
}
new(this)CommonHighLevelThreadWrapper{std::move(other)};
}
template<class native_wrapper_t> requires LdH::NativeHighLevelThreadWrapper<native_wrapper_t>
void LdH::CommonHighLevelThreadWrapper<native_wrapper_t>::join() {
switch (this->_cas_state(_thread_state_RUNNING, _thread_state_JOINING)) {
case _thread_state_UNINITIALIZED:
throw LdH::Exception{"Variable not initialized"};
case _thread_state_RUNNING:
break;
case _thread_state_JOINING:
case _thread_state_JOINED:
throw LdH::Exception{"Thread already joined"};
case _thread_state_DESTROYED:
throw LdH::Exception{"Thread already destroyed"};
case _thread_state_MOVED:
throw LdH::Exception{"Content of this variable was moved to another variable"};
break;
}
this->_native.join();
this->_state.store(_thread_state_JOINED);
}
template<class native_wrapper_t> requires LdH::NativeHighLevelThreadWrapper<native_wrapper_t>
void LdH::CommonHighLevelThreadWrapper<native_wrapper_t>::destroy() {
switch (this->_cas_state(_thread_state_JOINED, _thread_state_DESTROYED)) {
case _thread_state_UNINITIALIZED:
throw LdH::Exception{"Variable not initialized"};
case _thread_state_RUNNING:
case _thread_state_JOINING:
throw LdH::Exception{"Thread not finished yet"};
case _thread_state_JOINED:
break;
case _thread_state_DESTROYED:
throw LdH::Exception{"Thread already destroyed"};
case _thread_state_MOVED:
throw LdH::Exception{"Content of this variable was moved to another variable"};
}
this->_native.destroy();
}
template<class native_wrapper_t> requires LdH::NativeHighLevelThreadWrapper<native_wrapper_t>
LdH::CommonHighLevelThreadWrapper<native_wrapper_t>::~CommonHighLevelThreadWrapper() noexcept(false) {
switch (this->_state.load()) {
case _thread_state_UNINITIALIZED:
break;
case _thread_state_RUNNING:
case _thread_state_JOINING:
throw LdH::Exception{"Thread not finished yet"};
case _thread_state_JOINED:
throw LdH::Exception{"Thread not destroyed yet"};
case _thread_state_DESTROYED:
break;
case _thread_state_MOVED:
break;
}
}

View File

@ -11,160 +11,20 @@ import std;
import ru.landgrafhomyak.BGTU.networks_1.exceptions; import ru.landgrafhomyak.BGTU.networks_1.exceptions;
import ru.landgrafhomyak.BGTU.networks_1.exceptions.windows; import ru.landgrafhomyak.BGTU.networks_1.exceptions.windows;
import :thread_routine; import :thread_routine;
import :highlevel_impl;
namespace LdH::_NativeThreads { namespace LdH::_NativeThreads {
enum class _thread_state { class NativeThreadWrapperForWindows {
_thread_state_UNINITIALIZED,
_thread_state_RUNNING,
_thread_state_JOINING,
_thread_state_JOINED,
_thread_state_DESTROYED,
_thread_state_MOVED
};
using _thread_state::_thread_state_UNINITIALIZED;
using _thread_state::_thread_state_RUNNING;
using _thread_state::_thread_state_JOINING;
using _thread_state::_thread_state_JOINED;
using _thread_state::_thread_state_DESTROYED;
using _thread_state::_thread_state_MOVED;
export class ThreadController;
export ThreadController fork(std::string &&name, ThreadRoutine auto &&);
export class ThreadController {
private: private:
std::atomic<_thread_state> state; HANDLE _hThread;
HANDLE hThread;
public: public:
ThreadController(); NativeThreadWrapperForWindows() : _hThread{nullptr} {
ThreadController(ThreadController &) = delete;
ThreadController(ThreadController const &) = delete;
ThreadController(ThreadController &&other) noexcept;
void operator=(ThreadController &&other) noexcept;
void join();
void destroy();
~ThreadController() noexcept(false);
private:
ThreadController(_thread_state state, HANDLE hThread) : state{state}, hThread{hThread} {
} }
template<ThreadRoutine thread_routine_t> void join() {
struct _initializer {
public:
void *volatile notifier;
std::string name;
thread_routine_t thread_routine;
};
template<ThreadRoutine thread_routine_t>
static DWORD _kernel(_initializer<thread_routine_t> *init) {
std::string thread_name = std::move(init->name);
thread_routine_t thread_routine = std::move(init->thread_routine);
init->notifier = init;
WakeByAddressSingle(const_cast<void **>(&init->notifier));
try {
thread_routine();
} catch (LdH::Exception const &e) {
std::cerr << "Uncaught exception in thread" << thread_name << ":\n";
e.printStackTrace();
} catch (std::exception const &e) {
std::cerr << "Uncaught exception in thread" << thread_name << ":\n" << e.what() << "\n";
}
return 0;
};
friend
ThreadController fork(std::string &&, ThreadRoutine auto &&);
[[nodiscard]] _thread_state _cas_state(_thread_state expected, _thread_state next) {
this->state.compare_exchange_strong(expected, next);
return expected;
}
};
}
export LdH::_NativeThreads::ThreadController LdH::_NativeThreads::fork(std::string &&name, LdH::ThreadRoutine auto &&routine) {
using thread_routine_t = std::remove_reference_t<decltype(routine)>;
void *notifier_expected = nullptr;
LdH::_NativeThreads::ThreadController::_initializer<thread_routine_t> initializer{
.notifier = nullptr,
.name = std::move(name),
.thread_routine = std::move(routine)
};
HANDLE hThread = CreateThread(
nullptr,
0,
(LPTHREAD_START_ROUTINE) &LdH::_NativeThreads::ThreadController::_kernel<thread_routine_t>,
&initializer,
0u,
nullptr
);
if (hThread == nullptr)
LdH::throwFromLastWindowsErr();
while (true) { while (true) {
if (TRUE == WaitOnAddress(&(initializer.notifier), &notifier_expected, sizeof(void *), INFINITE)) switch (WaitForSingleObject(this->_hThread, INFINITE)) {
break;
LdH::throwFromLastWindowsErrOrTimeout();
}
return ThreadController{_thread_state_RUNNING, hThread};
}
LdH::_NativeThreads::ThreadController::ThreadController() : state{_thread_state_UNINITIALIZED}, hThread{nullptr} {
}
LdH::_NativeThreads::ThreadController::ThreadController(LdH::_NativeThreads::ThreadController &&other) noexcept : state{other.state.exchange(_thread_state_MOVED)}, hThread{other.hThread} {
other.hThread = nullptr;
}
void LdH::_NativeThreads::ThreadController::operator=(ThreadController &&other) noexcept {
switch (this->state.load()) {
case _thread_state_UNINITIALIZED:
case _thread_state_MOVED:
case _thread_state_DESTROYED:
break;
case _thread_state_RUNNING:
case _thread_state_JOINING:
case _thread_state_JOINED:
throw LdH::Exception{"Variable already initialized"};
}
new(this)ThreadController{std::move(other)};
}
void LdH::_NativeThreads::ThreadController::join() {
switch (this->_cas_state(_thread_state_RUNNING, _thread_state_JOINING)) {
case _thread_state_UNINITIALIZED:
throw LdH::Exception{"Variable not initialized"};
case _thread_state_RUNNING:
break;
case _thread_state_JOINING:
case _thread_state_JOINED:
throw LdH::Exception{"Thread already joined"};
case _thread_state_DESTROYED:
throw LdH::Exception{"Thread already destroyed"};
case _thread_state_MOVED:
throw LdH::Exception{"Content of this variable was moved to another variable"};
break;
}
while (true) {
switch (WaitForSingleObject(this->hThread, INFINITE)) {
case WAIT_FAILED: case WAIT_FAILED:
LdH::throwFromLastWindowsErr(); LdH::throwFromLastWindowsErr();
case WAIT_TIMEOUT: case WAIT_TIMEOUT:
@ -172,44 +32,79 @@ void LdH::_NativeThreads::ThreadController::join() {
case WAIT_ABANDONED: case WAIT_ABANDONED:
throw LdH::Exception{"Unexpected return WAIT_ABANDONED of WaitForSingleObject"}; throw LdH::Exception{"Unexpected return WAIT_ABANDONED of WaitForSingleObject"};
case WAIT_OBJECT_0: case WAIT_OBJECT_0:
this->state.store(_thread_state_JOINED);
return; return;
default:
throw LdH::Exception{"Unexpected return of WaitForSingleObject"};
} }
} }
} }
void LdH::_NativeThreads::ThreadController::destroy() { void destroy() {
switch (this->_cas_state(_thread_state_JOINED, _thread_state_DESTROYED)) { if (0 == CloseHandle(this->_hThread))
case _thread_state_UNINITIALIZED:
throw LdH::Exception{"Variable not initialized"};
case _thread_state_RUNNING:
case _thread_state_JOINING:
throw LdH::Exception{"Thread not finished yet"};
case _thread_state_JOINED:
break;
case _thread_state_DESTROYED:
throw LdH::Exception{"Thread already destroyed"};
case _thread_state_MOVED:
throw LdH::Exception{"Content of this variable was moved to another variable"};
}
if (0 == CloseHandle(this->hThread))
LdH::throwFromLastWindowsErr(); LdH::throwFromLastWindowsErr();
this->hThread = nullptr; this->_hThread = nullptr;
} }
LdH::_NativeThreads::ThreadController::~ThreadController() noexcept(false) { ~NativeThreadWrapperForWindows() {
switch (this->state.load()) { this->_hThread = nullptr;
case _thread_state_UNINITIALIZED: }
break;
case _thread_state_RUNNING: class notifier_t {
case _thread_state_JOINING: void *_ptr;
throw LdH::Exception{"Thread not finished yet"};
case _thread_state_JOINED: public:
throw LdH::Exception{"Thread not destroyed yet"}; notifier_t() : _ptr{nullptr} {
case _thread_state_DESTROYED: }
break;
case _thread_state_MOVED: void wait() {
void *non_changed_value = nullptr;
while (true) {
if (TRUE == WaitOnAddress(&this->_ptr, &non_changed_value, sizeof(void *), INFINITE))
break; break;
LdH::throwFromLastWindowsErrOrTimeout();
}
}
void notify() {
this->_ptr = this;
WakeByAddressSingle(&this->_ptr);
}
};
using kernel_ret_t = DWORD;
static DWORD kernel_ret() {
return 0;
}
private:
explicit NativeThreadWrapperForWindows(HANDLE hThread) : _hThread{hThread} {
}
public:
static NativeThreadWrapperForWindows start(std::string &, DWORD (*kernel)(void *), void *kernel_arg) {
HANDLE hThread = CreateThread(
nullptr,
0,
static_cast<LPTHREAD_START_ROUTINE>(kernel),
kernel_arg,
0u,
nullptr
);
if (hThread == nullptr)
LdH::throwFromLastWindowsErr();
return NativeThreadWrapperForWindows{hThread};
}
};
static_assert(LdH::NativeHighLevelThreadWrapper<NativeThreadWrapperForWindows>);
export
using ThreadController = LdH::CommonHighLevelThreadWrapper<NativeThreadWrapperForWindows>;
export ThreadController fork(std::string &&name, ThreadRoutine auto &&routine) {
return _common_highlevel_fork<NativeThreadWrapperForWindows>(std::move(name), std::move(routine));
} }
} }