diff --git a/src/common/db_backend_lmdb.cpp b/src/common/db_backend_lmdb.cpp index ceb46796..d507b0ca 100644 --- a/src/common/db_backend_lmdb.cpp +++ b/src/common/db_backend_lmdb.cpp @@ -10,18 +10,38 @@ #include "util.h" #define BUF_SIZE 1024 -#define DB_RESIZE_MIN_FREE_SIZE (100 * 1024 * 1024) // DB map size will grow if that much space left on DB -#define DB_RESIZE_MIN_MAX_SIZE (50 * 1024 * 1024) // Minimum DB map size (starting size) -#define DB_RESIZE_INCREMENT_SIZE (100 * 1024 * 1024) // Grow step size -#define DB_RESIZE_COMMITS_TO_CHECK 50 +#define DB_RESIZE_MIN_FREE_SIZE (6 * 1024 * 1024) // DB resize will triggeres if that much space left on DB map +#define DB_RESIZE_MIN_MAX_SIZE (1 * 1024 * 1024) // DB resize will triggered if DB map size is less than this +#define DB_RESIZE_INCREMENT_SIZE (6 * 1024 * 1024) // Grow step size +#define DB_RESIZE_COMMITS_TO_CHECK 2 -#define CHECK_AND_ASSERT_MESS_LMDB_DB(rc, ret, mess) CHECK_AND_ASSERT_MES(res == MDB_SUCCESS, ret, "[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess); -#define CHECK_AND_ASSERT_THROW_MESS_LMDB_DB(rc, mess) CHECK_AND_ASSERT_THROW_MES(res == MDB_SUCCESS, "[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess); -#define ASSERT_MES_AND_THROW_LMDB(rc, mess) ASSERT_MES_AND_THROW("[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess); +#define CHECK_AND_ASSERT_MESS_LMDB_DB(rc, ret, mess) CHECK_AND_ASSERT_MES(res == MDB_SUCCESS, ret, "[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess << ENDL << "LMDB " << get_brief_lmdb_stat(m_penv)); +#define CHECK_AND_ASSERT_THROW_MESS_LMDB_DB(rc, mess) CHECK_AND_ASSERT_THROW_MES(res == MDB_SUCCESS, "[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess << ENDL << "LMDB " << get_brief_lmdb_stat(m_penv)); +#define ASSERT_MES_AND_THROW_LMDB(rc, mess) ASSERT_MES_AND_THROW("[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess << ENDL << "LMDB " << get_brief_lmdb_stat(m_penv)); #undef LOG_DEFAULT_CHANNEL #define LOG_DEFAULT_CHANNEL "lmdb" -// 'lmdb' channel is disabled by default +ENABLE_CHANNEL_BY_DEFAULT("lmdb"); + +namespace +{ + std::string get_brief_lmdb_stat(MDB_env* p_env) + { + MDB_stat st = AUTO_VAL_INIT(st); + mdb_env_stat(p_env, &st); + MDB_envinfo ei = AUTO_VAL_INIT(ei); + mdb_env_info(p_env, &ei); + + static const double megabyte = 1024 * 1024; + + std::stringstream ss; + ss << "mapsz: " << std::fixed << std::setprecision(2) << ei.me_mapsize / megabyte + << " MB, dirty: " << ei.me_last_pgno * st.ms_psize / megabyte + << " MB, free: " << (ei.me_mapsize - ei.me_last_pgno * st.ms_psize) / megabyte + << " MB (" << (100.0 * (ei.me_mapsize - ei.me_last_pgno * st.ms_psize) / ei.me_mapsize) << "%)"; + return ss.str(); + } +} namespace tools { @@ -30,6 +50,9 @@ namespace tools lmdb_db_backend::lmdb_db_backend() : m_penv(AUTO_VAL_INIT(m_penv)) , m_commits_count(0) + , m_resize_thread_active(false) + , m_resize_wait_condition(false) + , m_resize_wait_txs_count(0) { } @@ -38,6 +61,8 @@ namespace tools { NESTED_TRY_ENTRY(); + wait_for_resize_if_needed(); + close(); NESTED_CATCH_ENTRY(__func__); @@ -112,17 +137,15 @@ namespace tools bool lmdb_db_backend::begin_transaction(bool read_only) { + wait_for_resize_if_needed(); + if (!read_only) { LOG_PRINT_CYAN("[DB " << m_path << "] WRITE LOCKED", LOG_LEVEL_3); CRITICAL_SECTION_LOCK(m_write_exclusive_lock); if (m_commits_count.fetch_add(1, std::memory_order_relaxed) % DB_RESIZE_COMMITS_TO_CHECK == DB_RESIZE_COMMITS_TO_CHECK - 1) - { - if (!resize_if_needed()) - m_commits_count.store(DB_RESIZE_COMMITS_TO_CHECK - 1, std::memory_order_relaxed); // if failed, try again on next commit - } - + resize_if_needed(); } PROFILE_FUNC("lmdb_db_backend::begin_transaction"); { @@ -233,6 +256,8 @@ namespace tools } } LOG_PRINT_L4("[DB] Transaction committed"); + + notify_resize_thread_on_tx_end(); return true; } @@ -251,10 +276,10 @@ namespace tools LOG_PRINT_CYAN("[DB " << m_path << "] WRITE UNLOCKED(ABORTED)", LOG_LEVEL_3); } } - - } LOG_PRINT_L4("[DB] Transaction aborted"); + + notify_resize_thread_on_tx_end(); } bool lmdb_db_backend::erase(container_handle h, const char* k, size_t ks) @@ -403,39 +428,122 @@ namespace tools return true; } - bool lmdb_db_backend::resize_if_needed() + void lmdb_db_backend::wait_for_resize_if_needed() { - LOG_PRINT_CYAN("[DB " << m_path << "] WRITE LOCKED in resize_if_needed()", LOG_LEVEL_3); - CRITICAL_REGION_LOCAL(m_write_exclusive_lock); - - if (have_tx()) + // wait for m_resize_wait_condition == false { - LOG_PRINT_RED("[DB " << m_path << "] : resize_if_needed(): Have txs on stack, unable to resize!", LOG_LEVEL_0); - return false; + std::unique_lock lock(m_resize_wait_mutex); + m_resize_wait_cv.wait(lock, [this](){ return !m_resize_wait_condition; }); } + // increment current tx counter + { + std::lock_guard lock(m_resize_wait_txs_count_mutex); + ++m_resize_wait_txs_count; + } + } + + void lmdb_db_backend::notify_resize_thread_on_tx_end() + { + // decrement current tx counter + { + std::lock_guard lock(m_resize_wait_txs_count_mutex); + --m_resize_wait_txs_count; + } + + // notify resize thread on counter changed so it can go ahead when it's zero + m_resize_wait_txs_cv.notify_one(); + } + + + bool lmdb_db_backend::resize_condition(const MDB_stat& st, const MDB_envinfo& ei) + { + uint64_t dirty_size = ei.me_last_pgno * st.ms_psize; + int64_t size_left = ei.me_mapsize - dirty_size; + return size_left < DB_RESIZE_MIN_FREE_SIZE || ei.me_mapsize < DB_RESIZE_MIN_MAX_SIZE; + } + + void lmdb_db_backend::resize_thread() + { + epee::log_space::log_singletone::set_thread_log_prefix("[lmdb resize]"); + if (m_resize_thread_active) + return; // another thread already started + m_resize_thread_active = true; + auto slh = epee::misc_utils::create_scope_leave_handler([&](){ + m_resize_thread_active = false; + }); + LOG_PRINT_CYAN("[DB " << m_path << "] resize thread started", LOG_LEVEL_3); + + // asquare an exclusive access to LMDB + + // stop all new txs + { + std::lock_guard lock(m_resize_wait_mutex); + m_resize_wait_condition = true; + } + + // free all threads waiting on m_resize_wait_cv on exit + auto slh2 = epee::misc_utils::create_scope_leave_handler([&](){ + { + std::lock_guard lock(m_resize_wait_mutex); + m_resize_wait_condition = false; + } + m_resize_wait_cv.notify_all(); + }); + + // wait for all ongoing lmdb txs are finished (until m_resize_wait_txs_count_mutex is zero) + std::unique_lock lock(m_resize_wait_txs_count_mutex); + m_resize_wait_txs_cv.wait(lock, [this](){ return m_resize_wait_txs_count == 0; }); + + LOG_PRINT_CYAN("[DB " << m_path << "] exclusive access acquired", LOG_LEVEL_3); + + // check resize condition again and calculate new size + MDB_stat st = AUTO_VAL_INIT(st); mdb_env_stat(m_penv, &st); MDB_envinfo ei = AUTO_VAL_INIT(ei); mdb_env_info(m_penv, &ei); - uint64_t dirty_size = ei.me_last_pgno * st.ms_psize; - int64_t size_diff = ei.me_mapsize - dirty_size; - if (size_diff >= DB_RESIZE_MIN_FREE_SIZE && ei.me_mapsize >= DB_RESIZE_MIN_MAX_SIZE) - return true; // resize is not needed + if (!resize_condition(st, ei)) + { + LOG_PRINT_RED("resize is not needed: " << ei.me_mapsize << ", " << ei.me_mapsize - ei.me_last_pgno * st.ms_psize << ", " << ei.me_last_pgno << ", " << st.ms_psize, LOG_LEVEL_0); + return; + } - double gigabyte = 1024 * 1024 * 1024; + static const double gigabyte = 1024 * 1024 * 1024; const uint64_t increment_size_pg_aligned = DB_RESIZE_INCREMENT_SIZE - (DB_RESIZE_INCREMENT_SIZE % st.ms_psize); - // need to resize DB + // calculate new size uint64_t new_size = ei.me_mapsize - (ei.me_mapsize % increment_size_pg_aligned) + increment_size_pg_aligned; + // perform a resize int res = mdb_env_set_mapsize(m_penv, new_size); - CHECK_AND_ASSERT_MESS_LMDB_DB(res, false, "Unable to mdb_env_set_mapsize"); + CHECK_AND_ASSERT_MESS_LMDB_DB(res, ((void)(0)), "mdb_env_set_mapsize failed"); LOG_PRINT_CYAN("[DB " << m_path << "] has grown: " << std::fixed << std::setprecision(2) << ei.me_mapsize / gigabyte << " GiB -> " << std::fixed << std::setprecision(2) << new_size / gigabyte << " GiB", LOG_LEVEL_0); - return true; + // slh2 dtor should free all threads waiting on m_resize_wait_cv + // slh dtor should set m_resize_thread_active to false + } + + void lmdb_db_backend::resize_if_needed() + { + LOG_PRINT_CYAN("[DB " << m_path << "] WRITE LOCKED in resize_if_needed()", LOG_LEVEL_3); + CRITICAL_REGION_LOCAL(m_write_exclusive_lock); + + // m_write_exclusive_lock should be enough to safely call these: + MDB_stat st = AUTO_VAL_INIT(st); + mdb_env_stat(m_penv, &st); + MDB_envinfo ei = AUTO_VAL_INIT(ei); + mdb_env_info(m_penv, &ei); + + if (!resize_condition(st, ei)) + return; // resize is not needed + + std::thread rt(&lmdb_db_backend::resize_thread, this); + rt.detach(); + + return; } } diff --git a/src/common/db_backend_lmdb.h b/src/common/db_backend_lmdb.h index 84c441d3..31194833 100644 --- a/src/common/db_backend_lmdb.h +++ b/src/common/db_backend_lmdb.h @@ -42,6 +42,21 @@ namespace tools bool pop_tx_entry(tx_entry& txe); + void resize_thread(); + bool resize_condition(const MDB_stat& st, const MDB_envinfo& ei); + void wait_for_resize_if_needed(); + void notify_resize_thread_on_tx_end(); + + std::atomic m_resize_thread_active; + std::mutex m_resize_wait_mutex; + bool m_resize_wait_condition; + std::condition_variable m_resize_wait_cv; + + std::mutex m_resize_wait_txs_count_mutex; + std::condition_variable m_resize_wait_txs_cv; + size_t m_resize_wait_txs_count; + + public: lmdb_db_backend(); @@ -64,7 +79,7 @@ namespace tools //------------------------------------------------------------------------------------- bool have_tx(); MDB_txn* get_current_tx(); - bool resize_if_needed(); + void resize_if_needed(); }; }