From 761b75ad728f0bd1c9a0bad4954defc3fd7d4815 Mon Sep 17 00:00:00 2001 From: cryptozoidberg Date: Tue, 8 Feb 2022 14:51:22 +0100 Subject: [PATCH] added batch jobs to threads pool --- src/common/threads_pool.h | 40 +++++++++++++++++++- tests/performance_tests/threads_pool_tests.h | 27 ++++++++++++- 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/src/common/threads_pool.h b/src/common/threads_pool.h index b1d4aebc..235e48ec 100644 --- a/src/common/threads_pool.h +++ b/src/common/threads_pool.h @@ -37,7 +37,15 @@ namespace utils class threads_pool { - public: + public: + typedef std::list> jobs_container; + + template + static void add_job_to_container(jobs_container& cntr, t_executor_func func) + { + cntr.push_back(std::shared_ptr(static_cast(new call_executor_t(func)))); + } + void init() { m_is_stop = false; @@ -63,6 +71,34 @@ namespace utils return true; } + void add_batch_and_wait(const jobs_container& cntr) + { + std::condition_variable batch_condition; + std::mutex batch_mutex; + + + std::atomic cnt = 0; + for (const auto& jb : cntr) + { + call_executor_base* pjob = jb.get(); + add_job([&, pjob]() { + pjob->execute(); + { + std::lock_guard lock(batch_mutex); + cnt++; + } + batch_condition.notify_one(); + }); + } + + std::unique_lock lock(batch_mutex); + batch_condition.wait(lock, [&]() + { + return cnt == cntr.size(); + }); + LOG_PRINT_L0("All jobs finiahed"); + } + ~threads_pool() { m_is_stop = true; @@ -103,7 +139,7 @@ namespace utils - std::list> m_jobs_que; + jobs_container m_jobs_que; std::condition_variable m_condition; std::mutex m_queue_mutex; std::vector m_threads; diff --git a/tests/performance_tests/threads_pool_tests.h b/tests/performance_tests/threads_pool_tests.h index a1bf2042..6999ac30 100644 --- a/tests/performance_tests/threads_pool_tests.h +++ b/tests/performance_tests/threads_pool_tests.h @@ -10,7 +10,7 @@ inline -void thread_pool_tests() +void thread_pool_tests_simple() { { utils::threads_pool pool; @@ -28,5 +28,30 @@ void thread_pool_tests() LOG_PRINT_L0("All jobs finished"); } LOG_PRINT_L0("Scope left"); +} +inline +void thread_pool_tests() +{ + { + utils::threads_pool pool; + pool.init(); + std::atomic count_jobs_finished = 0; + + utils::threads_pool::jobs_container jobs; + size_t i = 0; + for (; i != 10; i++) + { + utils::threads_pool::add_job_to_container(jobs, [&, i]() {LOG_PRINT_L0("Job " << i << " started"); epee::misc_utils::sleep_no_w(10000); ++count_jobs_finished; LOG_PRINT_L0("Job " << i << " finished"); }); + } + + pool.add_batch_and_wait(jobs); + if (count_jobs_finished != i) + { + LOG_ERROR("Test failed"); + return; + } + LOG_PRINT_L0("All jobs finished"); + } + LOG_PRINT_L0("Scope left"); } \ No newline at end of file