// Copyright (c) 2014-2019 Zano Project // Distributed under the MIT/X11 software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #pragma once #include #include #include namespace utils { struct call_executor_base { virtual void execute()=0; }; template struct call_executor_t : public call_executor_base { call_executor_t(t_executor_func f) :m_func(f) {} t_executor_func m_func; virtual void execute() { m_func(); } }; template std::shared_ptr build_call_executor(t_executor_func func) { std::shared_ptr res(static_cast(new call_executor_t(func))); return res; } class threads_pool { public: void init() { m_is_stop = false; int num_threads = std::thread::hardware_concurrency(); for (int i = 0; i < num_threads; i++) { m_threads.push_back(std::thread([this](){this->worker_func(); })); } } threads_pool(): m_is_stop(false), m_threads_counter(0) {} template bool add_job(t_executor_func func) { { std::unique_lock lock(m_queue_mutex); m_jobs_que.push_back(build_call_executor(func)); } m_condition.notify_one(); return true; } ~threads_pool() { m_is_stop = true; m_condition.notify_all(); for (auto& th : m_threads) { th.join(); } } private: void worker_func() { LOG_PRINT_L0("Worker thread is started"); while (true) { std::shared_ptr job; { std::unique_lock lock(m_queue_mutex); m_condition.wait(lock, [this]() { return !m_jobs_que.empty() || m_is_stop; }); if (m_is_stop) { LOG_PRINT_L0("Worker thread is finished"); return; } job = m_jobs_que.front(); m_jobs_que.pop_front(); } job->execute(); } } std::list> m_jobs_que; std::condition_variable m_condition; std::mutex m_queue_mutex; std::vector m_threads; std::atomic m_is_stop; std::atomic m_threads_counter; }; }