Implement simple thread pool

This commit is contained in:
Nekotekina 2020-11-12 23:24:35 +03:00
parent 50d80c64fa
commit 67785a918c
3 changed files with 112 additions and 22 deletions

View file

@ -1842,8 +1842,68 @@ thread_local DECLARE(thread_ctrl::g_tls_error_callback) = nullptr;
DECLARE(thread_ctrl::g_native_core_layout) { native_core_arrangement::undefined };
static lf_fifo<atomic_t<thread_base**>, 240> s_thread_pool;
static shared_mutex s_pool_lock;
void thread_base::start(native_entry entry)
{
while (u32 size = s_thread_pool.size())
{
u32 pos = s_thread_pool.peek();
thread_base** tls = nullptr;
for (u32 i = pos; i < pos + size; i++)
{
auto val = s_thread_pool[i].load();
if (val && s_thread_pool[i].compare_and_swap_test(val, 0))
{
tls = val;
pos = i;
break;
}
}
if (tls)
{
// Send "this" and entry point
m_thread = reinterpret_cast<u64>(entry);
atomic_storage<thread_base*>::store(*tls, this);
s_thread_pool[pos].notify_one();
{
// Using it in MPMC manner is a bit tricky, since it's originally MPSC
std::lock_guard lock(s_pool_lock);
u32 count = 0;
while (!s_thread_pool[s_thread_pool.peek() + count])
{
count++;
if (count >= s_thread_pool.size())
{
break;
}
}
if (count)
{
s_thread_pool.pop_end(count);
}
}
// Wait for actual "m_thread" in return
while (m_thread == reinterpret_cast<u64>(entry))
{
busy_wait(300);
}
return;
}
}
#ifdef _WIN32
m_thread = ::_beginthreadex(nullptr, 0, entry, this, CREATE_SUSPENDED, nullptr);
verify("thread_ctrl::start" HERE), m_thread, ::ResumeThread(reinterpret_cast<HANDLE>(+m_thread)) != -1;
@ -1867,8 +1927,11 @@ void thread_base::initialize(void (*error_cb)(), bool(*wait_cb)(const void*))
return thread_ctrl::get_name_cached();
};
std::string name = thread_ctrl::get_name_cached();
set_name(thread_ctrl::get_name_cached());
}
void thread_base::set_name(std::string name)
{
#ifdef _MSC_VER
struct THREADNAME_INFO
{
@ -1983,11 +2046,34 @@ bool thread_base::finalize(thread_state result_state) noexcept
return !ok;
}
void thread_base::finalize() noexcept
void thread_base::finalize(u64 _self) noexcept
{
if (!_self)
{
// Don't even need to clean these values for detached threads
return;
}
atomic_wait_engine::set_wait_callback(nullptr);
g_tls_log_prefix = []() -> std::string { return {}; };
set_name("..pool");
thread_ctrl::g_tls_this_thread = nullptr;
// Try to add self to thread pool
const u32 pos = s_thread_pool.push_begin();
const auto tls = &thread_ctrl::g_tls_this_thread;
s_thread_pool[pos] = tls;
while (s_thread_pool[pos] == tls || !atomic_storage<thread_base*>::load(thread_ctrl::g_tls_this_thread))
{
s_thread_pool[pos].wait(tls);
}
// Restore thread id
const auto _this = thread_ctrl::g_tls_this_thread;
const auto entry = _this->m_thread.exchange(_self);
_this->m_thread.notify_one();
reinterpret_cast<native_entry>(entry)(_this);
}
void thread_ctrl::_wait_for(u64 usec, bool alert /* true */)
@ -2073,14 +2159,6 @@ thread_base::thread_base(std::string_view name)
thread_base::~thread_base()
{
if (m_thread)
{
#ifdef _WIN32
CloseHandle(reinterpret_cast<HANDLE>(m_thread.raw()));
#else
pthread_detach(reinterpret_cast<pthread_t>(m_thread.raw()));
#endif
}
}
bool thread_base::join() const
@ -2169,7 +2247,7 @@ void thread_ctrl::emergency_exit(std::string_view reason)
delete _this;
}
thread_base::finalize();
thread_base::finalize(0);
#ifdef _WIN32
_endthreadex(0);