diff --git a/modules/threads/CMakeLists.txt b/modules/threads/CMakeLists.txt index d6fff15..0b896f8 100644 --- a/modules/threads/CMakeLists.txt +++ b/modules/threads/CMakeLists.txt @@ -7,6 +7,7 @@ target_sources( FILE_SET cxx_modules TYPE CXX_MODULES FILES src/common.cppm src/thread_routine.cppm + src/highlevel_impl.cppm src/windows.cppm ) diff --git a/modules/threads/src/highlevel_impl.cppm b/modules/threads/src/highlevel_impl.cppm new file mode 100644 index 0000000..a2e3131 --- /dev/null +++ b/modules/threads/src/highlevel_impl.cppm @@ -0,0 +1,228 @@ +export module ru.landgrafhomyak.BGTU.networks_1.threads:highlevel_impl; +import std; +export import :thread_routine; + + +import ru.landgrafhomyak.BGTU.networks_1.exceptions; +import ru.landgrafhomyak.BGTU.networks_1.exceptions.windows; +import :thread_routine; + +namespace LdH { + export + template + concept NativeHighLevelThreadWrapper = std::movable && + std::destructible && + std::default_initializable && + requires(native_wrapper_t &&self) + { + { self.join() } -> std::same_as; + { self.destroy() } -> std::same_as; + } && + requires + { + typename native_wrapper_t::notifier_t; + requires std::default_initializable; + requires std::destructible; + } && + requires(typename native_wrapper_t::notifier_t &self) + { + { self.wait() } -> std::same_as; + { self.notify() } -> std::same_as; + } && + requires + { + typename native_wrapper_t::kernel_ret_t; + { native_wrapper_t::kernel_ret() } -> std::same_as; + } && + requires(std::string &name, typename native_wrapper_t::kernel_ret_t (*kernel)(void *), void *thread_arg) + { + { native_wrapper_t::start(name, kernel, thread_arg) } -> std::same_as; + }; + + + export + template requires NativeHighLevelThreadWrapper + class CommonHighLevelThreadWrapper; + + export + template requires NativeHighLevelThreadWrapper + CommonHighLevelThreadWrapper _common_highlevel_fork(std::string &&, ThreadRoutine auto &&); + + export + template requires NativeHighLevelThreadWrapper + class CommonHighLevelThreadWrapper { + private: + enum _thread_state { + _thread_state_UNINITIALIZED, + _thread_state_RUNNING, + _thread_state_JOINING, + _thread_state_JOINED, + _thread_state_DESTROYED, + _thread_state_MOVED + }; + + std::atomic<_thread_state> _state; + native_wrapper_t _native; + + CommonHighLevelThreadWrapper(_thread_state state, native_wrapper_t &&native) : _state{state}, _native{std::move(native)} { + } + + + template requires NativeHighLevelThreadWrapper + friend + CommonHighLevelThreadWrapper _common_highlevel_fork(std::string &&, ThreadRoutine auto &&); + + public: + CommonHighLevelThreadWrapper(); + + CommonHighLevelThreadWrapper(CommonHighLevelThreadWrapper &) = delete; + + CommonHighLevelThreadWrapper(CommonHighLevelThreadWrapper const &) = delete; + + CommonHighLevelThreadWrapper(CommonHighLevelThreadWrapper &&other) noexcept; + + void operator=(CommonHighLevelThreadWrapper &&other) noexcept; + + void join(); + + void destroy(); + + ~CommonHighLevelThreadWrapper() noexcept(false); + + private: + template + struct _initializer { + public: + native_wrapper_t::notifier_t notifier; + std::string name; + thread_routine_t thread_routine; + }; + + template + static native_wrapper_t::kernel_ret_t _kernel(_initializer *init) { + std::string thread_name = std::move(init->name); + thread_routine_t thread_routine = std::move(init->thread_routine); + init->notifier.notify(); + + try { + thread_routine(); + } catch (LdH::Exception const &e) { + std::cerr << "Uncaught exception in thread" << thread_name << ":\n"; + e.printStackTrace(); + } catch (std::exception const &e) { + std::cerr << "Uncaught exception in thread" << thread_name << ":\n" << e.what() << "\n"; + } + return native_wrapper_t::kernel_ret(); + }; + + + [[nodiscard]] _thread_state _cas_state(_thread_state expected, _thread_state next) { + this->_state.compare_exchange_strong(expected, next); + return expected; + } + }; +} + + +export +template requires LdH::NativeHighLevelThreadWrapper +LdH::CommonHighLevelThreadWrapper LdH::_common_highlevel_fork(std::string &&name, LdH::ThreadRoutine auto &&routine) { + using thread_routine_t = std::remove_reference_t; + + typename LdH::CommonHighLevelThreadWrapper::template _initializer initializer{ + .notifier = typename native_wrapper_t::notifier_t{}, + .name = std::move(name), + .thread_routine = std::move(routine) + }; + native_wrapper_t native = native_wrapper_t::start( + initializer.name, + reinterpret_cast(&LdH::CommonHighLevelThreadWrapper::template _kernel), + reinterpret_cast(&initializer) + ); + + initializer.notifier.wait(); + + return LdH::CommonHighLevelThreadWrapper{LdH::CommonHighLevelThreadWrapper::_thread_state_RUNNING, std::move(native)}; +} + +template requires LdH::NativeHighLevelThreadWrapper +LdH::CommonHighLevelThreadWrapper::CommonHighLevelThreadWrapper() : _state{_thread_state_UNINITIALIZED}, _native{} { +} + +template requires LdH::NativeHighLevelThreadWrapper +LdH::CommonHighLevelThreadWrapper::CommonHighLevelThreadWrapper( + CommonHighLevelThreadWrapper &&other +) noexcept : _state{other._state.exchange(_thread_state_MOVED)}, _native{std::move(other._native)} { +} + + +template requires LdH::NativeHighLevelThreadWrapper +void LdH::CommonHighLevelThreadWrapper::operator=(CommonHighLevelThreadWrapper &&other) noexcept { + switch (this->_state.load()) { + case _thread_state_UNINITIALIZED: + case _thread_state_MOVED: + case _thread_state_DESTROYED: + break; + case _thread_state_RUNNING: + case _thread_state_JOINING: + case _thread_state_JOINED: + throw LdH::Exception{"Variable already initialized"}; + } + new(this)CommonHighLevelThreadWrapper{std::move(other)}; +} + +template requires LdH::NativeHighLevelThreadWrapper +void LdH::CommonHighLevelThreadWrapper::join() { + switch (this->_cas_state(_thread_state_RUNNING, _thread_state_JOINING)) { + case _thread_state_UNINITIALIZED: + throw LdH::Exception{"Variable not initialized"}; + case _thread_state_RUNNING: + break; + case _thread_state_JOINING: + case _thread_state_JOINED: + throw LdH::Exception{"Thread already joined"}; + case _thread_state_DESTROYED: + throw LdH::Exception{"Thread already destroyed"}; + case _thread_state_MOVED: + throw LdH::Exception{"Content of this variable was moved to another variable"}; + break; + } + + this->_native.join(); + this->_state.store(_thread_state_JOINED); +} + +template requires LdH::NativeHighLevelThreadWrapper +void LdH::CommonHighLevelThreadWrapper::destroy() { + switch (this->_cas_state(_thread_state_JOINED, _thread_state_DESTROYED)) { + case _thread_state_UNINITIALIZED: + throw LdH::Exception{"Variable not initialized"}; + case _thread_state_RUNNING: + case _thread_state_JOINING: + throw LdH::Exception{"Thread not finished yet"}; + case _thread_state_JOINED: + break; + case _thread_state_DESTROYED: + throw LdH::Exception{"Thread already destroyed"}; + case _thread_state_MOVED: + throw LdH::Exception{"Content of this variable was moved to another variable"}; + } + this->_native.destroy(); +} + +template requires LdH::NativeHighLevelThreadWrapper +LdH::CommonHighLevelThreadWrapper::~CommonHighLevelThreadWrapper() noexcept(false) { + switch (this->_state.load()) { + case _thread_state_UNINITIALIZED: + break; + case _thread_state_RUNNING: + case _thread_state_JOINING: + throw LdH::Exception{"Thread not finished yet"}; + case _thread_state_JOINED: + throw LdH::Exception{"Thread not destroyed yet"}; + case _thread_state_DESTROYED: + break; + case _thread_state_MOVED: + break; + } +} diff --git a/modules/threads/src/windows.cppm b/modules/threads/src/windows.cppm index 43e4c7c..3a1d3d5 100644 --- a/modules/threads/src/windows.cppm +++ b/modules/threads/src/windows.cppm @@ -11,205 +11,100 @@ import std; import ru.landgrafhomyak.BGTU.networks_1.exceptions; import ru.landgrafhomyak.BGTU.networks_1.exceptions.windows; import :thread_routine; +import :highlevel_impl; namespace LdH::_NativeThreads { - enum class _thread_state { - _thread_state_UNINITIALIZED, - _thread_state_RUNNING, - _thread_state_JOINING, - _thread_state_JOINED, - _thread_state_DESTROYED, - _thread_state_MOVED - }; - - using _thread_state::_thread_state_UNINITIALIZED; - using _thread_state::_thread_state_RUNNING; - using _thread_state::_thread_state_JOINING; - using _thread_state::_thread_state_JOINED; - using _thread_state::_thread_state_DESTROYED; - using _thread_state::_thread_state_MOVED; - - export class ThreadController; - - export ThreadController fork(std::string &&name, ThreadRoutine auto &&); - - export class ThreadController { + class NativeThreadWrapperForWindows { private: - std::atomic<_thread_state> state; - HANDLE hThread; + HANDLE _hThread; public: - ThreadController(); - - ThreadController(ThreadController &) = delete; - - ThreadController(ThreadController const &) = delete; - - ThreadController(ThreadController &&other) noexcept; - - void operator=(ThreadController &&other) noexcept; - - void join(); - - void destroy(); - - ~ThreadController() noexcept(false); - - private: - ThreadController(_thread_state state, HANDLE hThread) : state{state}, hThread{hThread} { + NativeThreadWrapperForWindows() : _hThread{nullptr} { } - template - struct _initializer { - public: - void *volatile notifier; - std::string name; - thread_routine_t thread_routine; - }; - - template - static DWORD _kernel(_initializer *init) { - std::string thread_name = std::move(init->name); - thread_routine_t thread_routine = std::move(init->thread_routine); - init->notifier = init; - WakeByAddressSingle(const_cast(&init->notifier)); - try { - thread_routine(); - } catch (LdH::Exception const &e) { - std::cerr << "Uncaught exception in thread" << thread_name << ":\n"; - e.printStackTrace(); - } catch (std::exception const &e) { - std::cerr << "Uncaught exception in thread" << thread_name << ":\n" << e.what() << "\n"; + void join() { + while (true) { + switch (WaitForSingleObject(this->_hThread, INFINITE)) { + case WAIT_FAILED: + LdH::throwFromLastWindowsErr(); + case WAIT_TIMEOUT: + continue; + case WAIT_ABANDONED: + throw LdH::Exception{"Unexpected return WAIT_ABANDONED of WaitForSingleObject"}; + case WAIT_OBJECT_0: + return; + default: + throw LdH::Exception{"Unexpected return of WaitForSingleObject"}; + } + } + } + + void destroy() { + if (0 == CloseHandle(this->_hThread)) + LdH::throwFromLastWindowsErr(); + this->_hThread = nullptr; + } + + ~NativeThreadWrapperForWindows() { + this->_hThread = nullptr; + } + + class notifier_t { + void *_ptr; + + public: + notifier_t() : _ptr{nullptr} { + } + + void wait() { + void *non_changed_value = nullptr; + while (true) { + if (TRUE == WaitOnAddress(&this->_ptr, &non_changed_value, sizeof(void *), INFINITE)) + break; + + LdH::throwFromLastWindowsErrOrTimeout(); + } + } + + void notify() { + this->_ptr = this; + WakeByAddressSingle(&this->_ptr); } - return 0; }; - friend - ThreadController fork(std::string &&, ThreadRoutine auto &&); + using kernel_ret_t = DWORD; - [[nodiscard]] _thread_state _cas_state(_thread_state expected, _thread_state next) { - this->state.compare_exchange_strong(expected, next); - return expected; + static DWORD kernel_ret() { + return 0; } - }; -} + private: + explicit NativeThreadWrapperForWindows(HANDLE hThread) : _hThread{hThread} { + } + public: + static NativeThreadWrapperForWindows start(std::string &, DWORD (*kernel)(void *), void *kernel_arg) { + HANDLE hThread = CreateThread( + nullptr, + 0, + static_cast(kernel), + kernel_arg, + 0u, + nullptr + ); -export LdH::_NativeThreads::ThreadController LdH::_NativeThreads::fork(std::string &&name, LdH::ThreadRoutine auto &&routine) { - using thread_routine_t = std::remove_reference_t; - - void *notifier_expected = nullptr; - - LdH::_NativeThreads::ThreadController::_initializer initializer{ - .notifier = nullptr, - .name = std::move(name), - .thread_routine = std::move(routine) - }; - HANDLE hThread = CreateThread( - nullptr, - 0, - (LPTHREAD_START_ROUTINE) &LdH::_NativeThreads::ThreadController::_kernel, - &initializer, - 0u, - nullptr - ); - if (hThread == nullptr) - LdH::throwFromLastWindowsErr(); - - while (true) { - if (TRUE == WaitOnAddress(&(initializer.notifier), ¬ifier_expected, sizeof(void *), INFINITE)) - break; - - LdH::throwFromLastWindowsErrOrTimeout(); - } - - return ThreadController{_thread_state_RUNNING, hThread}; -} - -LdH::_NativeThreads::ThreadController::ThreadController() : state{_thread_state_UNINITIALIZED}, hThread{nullptr} { -} - -LdH::_NativeThreads::ThreadController::ThreadController(LdH::_NativeThreads::ThreadController &&other) noexcept : state{other.state.exchange(_thread_state_MOVED)}, hThread{other.hThread} { - other.hThread = nullptr; -} - - -void LdH::_NativeThreads::ThreadController::operator=(ThreadController &&other) noexcept { - switch (this->state.load()) { - case _thread_state_UNINITIALIZED: - case _thread_state_MOVED: - case _thread_state_DESTROYED: - break; - case _thread_state_RUNNING: - case _thread_state_JOINING: - case _thread_state_JOINED: - throw LdH::Exception{"Variable already initialized"}; - } - new(this)ThreadController{std::move(other)}; -} - -void LdH::_NativeThreads::ThreadController::join() { - switch (this->_cas_state(_thread_state_RUNNING, _thread_state_JOINING)) { - case _thread_state_UNINITIALIZED: - throw LdH::Exception{"Variable not initialized"}; - case _thread_state_RUNNING: - break; - case _thread_state_JOINING: - case _thread_state_JOINED: - throw LdH::Exception{"Thread already joined"}; - case _thread_state_DESTROYED: - throw LdH::Exception{"Thread already destroyed"}; - case _thread_state_MOVED: - throw LdH::Exception{"Content of this variable was moved to another variable"}; - break; - } - - while (true) { - switch (WaitForSingleObject(this->hThread, INFINITE)) { - case WAIT_FAILED: + if (hThread == nullptr) LdH::throwFromLastWindowsErr(); - case WAIT_TIMEOUT: - continue; - case WAIT_ABANDONED: - throw LdH::Exception{"Unexpected return WAIT_ABANDONED of WaitForSingleObject"}; - case WAIT_OBJECT_0: - this->state.store(_thread_state_JOINED); - return; + + return NativeThreadWrapperForWindows{hThread}; } - } -} + }; -void LdH::_NativeThreads::ThreadController::destroy() { - switch (this->_cas_state(_thread_state_JOINED, _thread_state_DESTROYED)) { - case _thread_state_UNINITIALIZED: - throw LdH::Exception{"Variable not initialized"}; - case _thread_state_RUNNING: - case _thread_state_JOINING: - throw LdH::Exception{"Thread not finished yet"}; - case _thread_state_JOINED: - break; - case _thread_state_DESTROYED: - throw LdH::Exception{"Thread already destroyed"}; - case _thread_state_MOVED: - throw LdH::Exception{"Content of this variable was moved to another variable"}; - } - if (0 == CloseHandle(this->hThread)) - LdH::throwFromLastWindowsErr(); - this->hThread = nullptr; -} + static_assert(LdH::NativeHighLevelThreadWrapper); -LdH::_NativeThreads::ThreadController::~ThreadController() noexcept(false) { - switch (this->state.load()) { - case _thread_state_UNINITIALIZED: - break; - case _thread_state_RUNNING: - case _thread_state_JOINING: - throw LdH::Exception{"Thread not finished yet"}; - case _thread_state_JOINED: - throw LdH::Exception{"Thread not destroyed yet"}; - case _thread_state_DESTROYED: - break; - case _thread_state_MOVED: - break; + export + using ThreadController = LdH::CommonHighLevelThreadWrapper; + + export ThreadController fork(std::string &&name, ThreadRoutine auto &&routine) { + return _common_highlevel_fork(std::move(name), std::move(routine)); } }