1
0
Fork 0
forked from lthn/blockchain

added batch jobs to threads pool

This commit is contained in:
cryptozoidberg 2022-02-08 14:51:22 +01:00
parent e2bb37e5cb
commit 761b75ad72
No known key found for this signature in database
GPG key ID: 22DEB97A54C6FDEC
2 changed files with 64 additions and 3 deletions

View file

@ -37,7 +37,15 @@ namespace utils
class threads_pool
{
public:
public:
typedef std::list<std::shared_ptr<call_executor_base>> jobs_container;
template<typename t_executor_func>
static void add_job_to_container(jobs_container& cntr, t_executor_func func)
{
cntr.push_back(std::shared_ptr<call_executor_base>(static_cast<call_executor_base*>(new call_executor_t<t_executor_func>(func))));
}
void init()
{
m_is_stop = false;
@ -63,6 +71,34 @@ namespace utils
return true;
}
void add_batch_and_wait(const jobs_container& cntr)
{
std::condition_variable batch_condition;
std::mutex batch_mutex;
std::atomic<size_t> cnt = 0;
for (const auto& jb : cntr)
{
call_executor_base* pjob = jb.get();
add_job([&, pjob]() {
pjob->execute();
{
std::lock_guard<std::mutex> lock(batch_mutex);
cnt++;
}
batch_condition.notify_one();
});
}
std::unique_lock<std::mutex> lock(batch_mutex);
batch_condition.wait(lock, [&]()
{
return cnt == cntr.size();
});
LOG_PRINT_L0("All jobs finiahed");
}
~threads_pool()
{
m_is_stop = true;
@ -103,7 +139,7 @@ namespace utils
std::list<std::shared_ptr<call_executor_base>> m_jobs_que;
jobs_container m_jobs_que;
std::condition_variable m_condition;
std::mutex m_queue_mutex;
std::vector<std::thread> m_threads;

View file

@ -10,7 +10,7 @@
inline
void thread_pool_tests()
void thread_pool_tests_simple()
{
{
utils::threads_pool pool;
@ -28,5 +28,30 @@ void thread_pool_tests()
LOG_PRINT_L0("All jobs finished");
}
LOG_PRINT_L0("Scope left");
}
inline
void thread_pool_tests()
{
{
utils::threads_pool pool;
pool.init();
std::atomic<uint64_t> count_jobs_finished = 0;
utils::threads_pool::jobs_container jobs;
size_t i = 0;
for (; i != 10; i++)
{
utils::threads_pool::add_job_to_container(jobs, [&, i]() {LOG_PRINT_L0("Job " << i << " started"); epee::misc_utils::sleep_no_w(10000); ++count_jobs_finished; LOG_PRINT_L0("Job " << i << " finished"); });
}
pool.add_batch_and_wait(jobs);
if (count_jobs_finished != i)
{
LOG_ERROR("Test failed");
return;
}
LOG_PRINT_L0("All jobs finished");
}
LOG_PRINT_L0("Scope left");
}