Fixed thread pool a bit

Use 128-bit allocator instead of queue.
When pool is full (128), threads just terminate as before.
This commit is contained in:
Nekotekina 2020-11-13 11:32:47 +03:00
parent e48f160a29
commit ab365fe494
4 changed files with 134 additions and 59 deletions

View file

@ -75,6 +75,7 @@
#include "sync.h"
#include "util/vm.hpp"
#include "util/logs.hpp"
#include "util/asm.hpp"
#include "Emu/Memory/vm_locking.h"
LOG_CHANNEL(sig_log, "SIG");
@ -1842,66 +1843,40 @@ thread_local DECLARE(thread_ctrl::g_tls_error_callback) = nullptr;
DECLARE(thread_ctrl::g_native_core_layout) { native_core_arrangement::undefined };
static lf_fifo<atomic_t<thread_base**>, 240> s_thread_pool;
static atomic_t<u128, 64> s_thread_bits{0};
static shared_mutex s_pool_lock;
static atomic_t<thread_base**> s_thread_pool[128]{};
void thread_base::start(native_entry entry)
{
while (u32 size = s_thread_pool.size())
for (u128 bits = s_thread_bits.load(); bits; bits &= bits - 1)
{
u32 pos = s_thread_pool.peek();
thread_base** tls = nullptr;
const u32 pos = utils::ctz128(bits);
for (u32 i = pos; i < pos + size; i++)
if (!s_thread_pool[pos])
{
auto val = s_thread_pool[i].load();
if (val && s_thread_pool[i].compare_and_swap_test(val, 0))
{
tls = val;
pos = i;
break;
}
continue;
}
if (tls)
thread_base** tls = s_thread_pool[pos].exchange(nullptr);
if (!tls)
{
// Send "this" and entry point
m_thread = reinterpret_cast<u64>(entry);
atomic_storage<thread_base*>::store(*tls, this);
s_thread_pool[pos].notify_one();
{
// Using it in MPMC manner is a bit tricky, since it's originally MPSC
std::lock_guard lock(s_pool_lock);
u32 count = 0;
while (!s_thread_pool[s_thread_pool.peek() + count])
{
count++;
if (count >= s_thread_pool.size())
{
break;
}
}
if (count)
{
s_thread_pool.pop_end(count);
}
}
// Wait for actual "m_thread" in return
while (m_thread == reinterpret_cast<u64>(entry))
{
busy_wait(300);
}
return;
continue;
}
// Send "this" and entry point
m_thread = reinterpret_cast<u64>(entry);
atomic_storage<thread_base*>::release(*tls, this);
s_thread_pool[pos].notify_all();
// Wait for actual "m_thread" in return
while (m_thread == reinterpret_cast<u64>(entry))
{
busy_wait(300);
}
return;
}
#ifdef _WIN32
@ -1990,7 +1965,7 @@ void thread_base::notify_abort() noexcept
}
}
bool thread_base::finalize(thread_state result_state) noexcept
u64 thread_base::finalize(thread_state result_state) noexcept
{
// Report pending errors
error_code::error_report(0, 0, 0, 0);
@ -2036,6 +2011,9 @@ bool thread_base::finalize(thread_state result_state) noexcept
g_tls_fault_spu,
fsoft, fhard, ctxvol, ctxinv);
const u64 _self = m_thread;
m_thread.release(0);
// Return true if need to delete thread object
const bool ok = m_state.exchange(result_state) <= thread_state::aborting;
@ -2043,7 +2021,7 @@ bool thread_base::finalize(thread_state result_state) noexcept
m_state.notify_all();
// No detached thread supported atm
return !ok;
return _self;
}
void thread_base::finalize(u64 _self) noexcept
@ -2056,23 +2034,54 @@ void thread_base::finalize(u64 _self) noexcept
atomic_wait_engine::set_wait_callback(nullptr);
g_tls_log_prefix = []() -> std::string { return {}; };
set_name("..pool");
thread_ctrl::g_tls_this_thread = nullptr;
// Try to add self to thread pool
const u32 pos = s_thread_pool.push_begin();
const auto [bits, ok] = s_thread_bits.fetch_op([](u128& bits)
{
if (~bits) [[likely]]
{
// Set lowest clear bit
bits |= bits + 1;
return true;
}
return false;
});
if (!ok)
{
#ifdef _WIN32
CloseHandle(reinterpret_cast<HANDLE>(_self));
#else
pthread_detach(reinterpret_cast<pthread_t>(_self));
#endif
return;
}
set_name("..pool");
// Obtain id from atomic op
const u32 pos = utils::ctz128(~bits);
const auto tls = &thread_ctrl::g_tls_this_thread;
s_thread_pool[pos] = tls;
while (s_thread_pool[pos] == tls || !atomic_storage<thread_base*>::load(thread_ctrl::g_tls_this_thread))
while (s_thread_pool[pos] == tls || !atomic_storage<thread_base*>::load(*tls))
{
s_thread_pool[pos].wait(tls);
}
// Free thread pool slot
s_thread_bits.atomic_op([pos](u128& val)
{
val &= ~(u128(1) << pos);
});
// Restore thread id
const auto _this = thread_ctrl::g_tls_this_thread;
const auto _this = atomic_storage<thread_base*>::load(*tls);
const auto entry = _this->m_thread.exchange(_self);
_this->m_thread.notify_one();
reinterpret_cast<native_entry>(entry)(_this);
}
@ -2159,6 +2168,15 @@ thread_base::thread_base(std::string_view name)
thread_base::~thread_base()
{
if (m_thread)
{
#ifdef _WIN32
CloseHandle(reinterpret_cast<HANDLE>(m_thread.raw()));
#else
pthread_detach(reinterpret_cast<pthread_t>(m_thread.raw()));
#endif
}
}
bool thread_base::join() const