1
0
Fork 0
forked from lthn/blockchain

lmdb: attempt to fix multithreading issues (not successful, leads to deadlock due to inter-thread dependencies on BCS level)

This commit is contained in:
sowle 2019-08-27 22:22:15 +03:00
parent 55ba3dda28
commit 1dca4735a8
No known key found for this signature in database
GPG key ID: C07A24B2D89D49FC
2 changed files with 154 additions and 31 deletions

View file

@ -10,18 +10,38 @@
#include "util.h"
#define BUF_SIZE 1024
#define DB_RESIZE_MIN_FREE_SIZE (100 * 1024 * 1024) // DB map size will grow if that much space left on DB
#define DB_RESIZE_MIN_MAX_SIZE (50 * 1024 * 1024) // Minimum DB map size (starting size)
#define DB_RESIZE_INCREMENT_SIZE (100 * 1024 * 1024) // Grow step size
#define DB_RESIZE_COMMITS_TO_CHECK 50
#define DB_RESIZE_MIN_FREE_SIZE (6 * 1024 * 1024) // DB resize will triggeres if that much space left on DB map
#define DB_RESIZE_MIN_MAX_SIZE (1 * 1024 * 1024) // DB resize will triggered if DB map size is less than this
#define DB_RESIZE_INCREMENT_SIZE (6 * 1024 * 1024) // Grow step size
#define DB_RESIZE_COMMITS_TO_CHECK 2
#define CHECK_AND_ASSERT_MESS_LMDB_DB(rc, ret, mess) CHECK_AND_ASSERT_MES(res == MDB_SUCCESS, ret, "[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess);
#define CHECK_AND_ASSERT_THROW_MESS_LMDB_DB(rc, mess) CHECK_AND_ASSERT_THROW_MES(res == MDB_SUCCESS, "[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess);
#define ASSERT_MES_AND_THROW_LMDB(rc, mess) ASSERT_MES_AND_THROW("[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess);
#define CHECK_AND_ASSERT_MESS_LMDB_DB(rc, ret, mess) CHECK_AND_ASSERT_MES(res == MDB_SUCCESS, ret, "[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess << ENDL << "LMDB " << get_brief_lmdb_stat(m_penv));
#define CHECK_AND_ASSERT_THROW_MESS_LMDB_DB(rc, mess) CHECK_AND_ASSERT_THROW_MES(res == MDB_SUCCESS, "[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess << ENDL << "LMDB " << get_brief_lmdb_stat(m_penv));
#define ASSERT_MES_AND_THROW_LMDB(rc, mess) ASSERT_MES_AND_THROW("[DB ERROR]:(" << rc << ")" << mdb_strerror(rc) << ", [message]: " << mess << ENDL << "LMDB " << get_brief_lmdb_stat(m_penv));
#undef LOG_DEFAULT_CHANNEL
#define LOG_DEFAULT_CHANNEL "lmdb"
// 'lmdb' channel is disabled by default
ENABLE_CHANNEL_BY_DEFAULT("lmdb");
namespace
{
std::string get_brief_lmdb_stat(MDB_env* p_env)
{
MDB_stat st = AUTO_VAL_INIT(st);
mdb_env_stat(p_env, &st);
MDB_envinfo ei = AUTO_VAL_INIT(ei);
mdb_env_info(p_env, &ei);
static const double megabyte = 1024 * 1024;
std::stringstream ss;
ss << "mapsz: " << std::fixed << std::setprecision(2) << ei.me_mapsize / megabyte
<< " MB, dirty: " << ei.me_last_pgno * st.ms_psize / megabyte
<< " MB, free: " << (ei.me_mapsize - ei.me_last_pgno * st.ms_psize) / megabyte
<< " MB (" << (100.0 * (ei.me_mapsize - ei.me_last_pgno * st.ms_psize) / ei.me_mapsize) << "%)";
return ss.str();
}
}
namespace tools
{
@ -30,6 +50,9 @@ namespace tools
lmdb_db_backend::lmdb_db_backend()
: m_penv(AUTO_VAL_INIT(m_penv))
, m_commits_count(0)
, m_resize_thread_active(false)
, m_resize_wait_condition(false)
, m_resize_wait_txs_count(0)
{
}
@ -38,6 +61,8 @@ namespace tools
{
NESTED_TRY_ENTRY();
wait_for_resize_if_needed();
close();
NESTED_CATCH_ENTRY(__func__);
@ -112,17 +137,15 @@ namespace tools
bool lmdb_db_backend::begin_transaction(bool read_only)
{
wait_for_resize_if_needed();
if (!read_only)
{
LOG_PRINT_CYAN("[DB " << m_path << "] WRITE LOCKED", LOG_LEVEL_3);
CRITICAL_SECTION_LOCK(m_write_exclusive_lock);
if (m_commits_count.fetch_add(1, std::memory_order_relaxed) % DB_RESIZE_COMMITS_TO_CHECK == DB_RESIZE_COMMITS_TO_CHECK - 1)
{
if (!resize_if_needed())
m_commits_count.store(DB_RESIZE_COMMITS_TO_CHECK - 1, std::memory_order_relaxed); // if failed, try again on next commit
}
resize_if_needed();
}
PROFILE_FUNC("lmdb_db_backend::begin_transaction");
{
@ -233,6 +256,8 @@ namespace tools
}
}
LOG_PRINT_L4("[DB] Transaction committed");
notify_resize_thread_on_tx_end();
return true;
}
@ -251,10 +276,10 @@ namespace tools
LOG_PRINT_CYAN("[DB " << m_path << "] WRITE UNLOCKED(ABORTED)", LOG_LEVEL_3);
}
}
}
LOG_PRINT_L4("[DB] Transaction aborted");
notify_resize_thread_on_tx_end();
}
bool lmdb_db_backend::erase(container_handle h, const char* k, size_t ks)
@ -403,39 +428,122 @@ namespace tools
return true;
}
bool lmdb_db_backend::resize_if_needed()
void lmdb_db_backend::wait_for_resize_if_needed()
{
LOG_PRINT_CYAN("[DB " << m_path << "] WRITE LOCKED in resize_if_needed()", LOG_LEVEL_3);
CRITICAL_REGION_LOCAL(m_write_exclusive_lock);
if (have_tx())
// wait for m_resize_wait_condition == false
{
LOG_PRINT_RED("[DB " << m_path << "] : resize_if_needed(): Have txs on stack, unable to resize!", LOG_LEVEL_0);
return false;
std::unique_lock<decltype(m_resize_wait_mutex)> lock(m_resize_wait_mutex);
m_resize_wait_cv.wait(lock, [this](){ return !m_resize_wait_condition; });
}
// increment current tx counter
{
std::lock_guard<std::mutex> lock(m_resize_wait_txs_count_mutex);
++m_resize_wait_txs_count;
}
}
void lmdb_db_backend::notify_resize_thread_on_tx_end()
{
// decrement current tx counter
{
std::lock_guard<std::mutex> lock(m_resize_wait_txs_count_mutex);
--m_resize_wait_txs_count;
}
// notify resize thread on counter changed so it can go ahead when it's zero
m_resize_wait_txs_cv.notify_one();
}
bool lmdb_db_backend::resize_condition(const MDB_stat& st, const MDB_envinfo& ei)
{
uint64_t dirty_size = ei.me_last_pgno * st.ms_psize;
int64_t size_left = ei.me_mapsize - dirty_size;
return size_left < DB_RESIZE_MIN_FREE_SIZE || ei.me_mapsize < DB_RESIZE_MIN_MAX_SIZE;
}
void lmdb_db_backend::resize_thread()
{
epee::log_space::log_singletone::set_thread_log_prefix("[lmdb resize]");
if (m_resize_thread_active)
return; // another thread already started
m_resize_thread_active = true;
auto slh = epee::misc_utils::create_scope_leave_handler([&](){
m_resize_thread_active = false;
});
LOG_PRINT_CYAN("[DB " << m_path << "] resize thread started", LOG_LEVEL_3);
// asquare an exclusive access to LMDB
// stop all new txs
{
std::lock_guard<decltype(m_resize_wait_mutex)> lock(m_resize_wait_mutex);
m_resize_wait_condition = true;
}
// free all threads waiting on m_resize_wait_cv on exit
auto slh2 = epee::misc_utils::create_scope_leave_handler([&](){
{
std::lock_guard<decltype(m_resize_wait_mutex)> lock(m_resize_wait_mutex);
m_resize_wait_condition = false;
}
m_resize_wait_cv.notify_all();
});
// wait for all ongoing lmdb txs are finished (until m_resize_wait_txs_count_mutex is zero)
std::unique_lock<std::mutex> lock(m_resize_wait_txs_count_mutex);
m_resize_wait_txs_cv.wait(lock, [this](){ return m_resize_wait_txs_count == 0; });
LOG_PRINT_CYAN("[DB " << m_path << "] exclusive access acquired", LOG_LEVEL_3);
// check resize condition again and calculate new size
MDB_stat st = AUTO_VAL_INIT(st);
mdb_env_stat(m_penv, &st);
MDB_envinfo ei = AUTO_VAL_INIT(ei);
mdb_env_info(m_penv, &ei);
uint64_t dirty_size = ei.me_last_pgno * st.ms_psize;
int64_t size_diff = ei.me_mapsize - dirty_size;
if (size_diff >= DB_RESIZE_MIN_FREE_SIZE && ei.me_mapsize >= DB_RESIZE_MIN_MAX_SIZE)
return true; // resize is not needed
if (!resize_condition(st, ei))
{
LOG_PRINT_RED("resize is not needed: " << ei.me_mapsize << ", " << ei.me_mapsize - ei.me_last_pgno * st.ms_psize << ", " << ei.me_last_pgno << ", " << st.ms_psize, LOG_LEVEL_0);
return;
}
double gigabyte = 1024 * 1024 * 1024;
static const double gigabyte = 1024 * 1024 * 1024;
const uint64_t increment_size_pg_aligned = DB_RESIZE_INCREMENT_SIZE - (DB_RESIZE_INCREMENT_SIZE % st.ms_psize);
// need to resize DB
// calculate new size
uint64_t new_size = ei.me_mapsize - (ei.me_mapsize % increment_size_pg_aligned) + increment_size_pg_aligned;
// perform a resize
int res = mdb_env_set_mapsize(m_penv, new_size);
CHECK_AND_ASSERT_MESS_LMDB_DB(res, false, "Unable to mdb_env_set_mapsize");
CHECK_AND_ASSERT_MESS_LMDB_DB(res, ((void)(0)), "mdb_env_set_mapsize failed");
LOG_PRINT_CYAN("[DB " << m_path << "] has grown: " << std::fixed << std::setprecision(2) << ei.me_mapsize / gigabyte << " GiB -> " << std::fixed << std::setprecision(2) << new_size / gigabyte << " GiB", LOG_LEVEL_0);
return true;
// slh2 dtor should free all threads waiting on m_resize_wait_cv
// slh dtor should set m_resize_thread_active to false
}
void lmdb_db_backend::resize_if_needed()
{
LOG_PRINT_CYAN("[DB " << m_path << "] WRITE LOCKED in resize_if_needed()", LOG_LEVEL_3);
CRITICAL_REGION_LOCAL(m_write_exclusive_lock);
// m_write_exclusive_lock should be enough to safely call these:
MDB_stat st = AUTO_VAL_INIT(st);
mdb_env_stat(m_penv, &st);
MDB_envinfo ei = AUTO_VAL_INIT(ei);
mdb_env_info(m_penv, &ei);
if (!resize_condition(st, ei))
return; // resize is not needed
std::thread rt(&lmdb_db_backend::resize_thread, this);
rt.detach();
return;
}
}

View file

@ -42,6 +42,21 @@ namespace tools
bool pop_tx_entry(tx_entry& txe);
void resize_thread();
bool resize_condition(const MDB_stat& st, const MDB_envinfo& ei);
void wait_for_resize_if_needed();
void notify_resize_thread_on_tx_end();
std::atomic<bool> m_resize_thread_active;
std::mutex m_resize_wait_mutex;
bool m_resize_wait_condition;
std::condition_variable m_resize_wait_cv;
std::mutex m_resize_wait_txs_count_mutex;
std::condition_variable m_resize_wait_txs_cv;
size_t m_resize_wait_txs_count;
public:
lmdb_db_backend();
@ -64,7 +79,7 @@ namespace tools
//-------------------------------------------------------------------------------------
bool have_tx();
MDB_txn* get_current_tx();
bool resize_if_needed();
void resize_if_needed();
};
}