diff --git a/tests/db_tests/CMakeLists.txt b/tests/db_tests/CMakeLists.txt index 50d0e9df..156270e7 100644 --- a/tests/db_tests/CMakeLists.txt +++ b/tests/db_tests/CMakeLists.txt @@ -1,3 +1,4 @@ add_executable(db_tests db_tests.cpp) +add_executable(db_size_test db_size_test.cpp) target_link_libraries(db_tests crypto common lmdb zlibstatic ${Boost_LIBRARIES}) - +target_link_libraries(db_size_test crypto common lmdb zlibstatic ${Boost_LIBRARIES}) diff --git a/tests/db_tests/db_size_test.cpp b/tests/db_tests/db_size_test.cpp new file mode 100644 index 00000000..5298851b --- /dev/null +++ b/tests/db_tests/db_size_test.cpp @@ -0,0 +1,324 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "epee/include/include_base_utils.h" +#include "include_base_utils.h" +#include "common/db_backend_lmdb.h" +#include "common/db_abstract_accessor.h" +#include "readwrite_lock.h" + +// #undef LOG_DEFAULT_CHANNEL +// #define LOG_DEFAULT_CHANNEL "db-bulk-writer" + +using namespace tools::db; + +enum class Mode { kv, array }; + +struct first_item_cb : public i_db_callback +{ + std::string k, v; + bool got = false; + bool on_enum_item(uint64_t /*i*/, const void* pkey, uint64_t ks, const void* pval, uint64_t vs) override + { + k.assign(static_cast(pkey), static_cast(ks)); + v.assign(static_cast(pval), static_cast(vs)); + got = true; + return false; // stop + } +}; + +static void gen_key_bytes(std::mt19937_64& gen, uint64_t counter, std::string& out, size_t size) +{ + out.resize(size); + for (size_t i = 0; i < size; i += 8) + { + uint64_t r = gen() ^ (0x9E3779B97F4A7C15ull * (counter + i)); + std::memcpy(&out[i], &r, std::min(8, size - i)); + } +} + +class bulk_writer +{ +public: + bulk_writer(basic_db_accessor& db, i_db_backend& _be, const std::string& table, container_handle h, Mode mode) + : _db(db), _be_(_be), _table(table), _h(h), _mode(mode) {} + + bool init_sample() + { + if (_mode == Mode::kv) + { + if (_be_.size(_h) == 0) + { + LOG_ERROR("[" << _table << "] empty"); + return false; + } + first_item_cb cb; + if (!_be_.enumerate(_h, &cb) || !cb.got) + { + LOG_ERROR("[" << _table << "] enumerate failed"); + return false; + } + _sample_key = cb.k; + _sample_val = cb.v; + _key_size = _sample_key.size(); + _bytes_per_put = _key_size + _sample_val.size(); + _entries = _be_.size(_h); + return true; + } + else + { // array + _entries = _be_.size(_h); + if (_entries == 0) + { + LOG_ERROR("[" << _table << "] empty"); + return false; + } + uint64_t last_key = _entries - 1; + if (!_be_.get(_h, reinterpret_cast(&last_key), sizeof(last_key), _sample_val)) + { + LOG_ERROR("[" << _table << "] get(last) failed"); + return false; + } + _key_size = sizeof(uint64_t); + _bytes_per_put = _key_size + _sample_val.size(); + return true; + } + } + + void print_initial_estimate() const + { + const long double MB_DEC = 1'000'000.0L; + long double approx_initial_bytes = static_cast(_entries) * _bytes_per_put; + LOG_PRINT_L0("[" << _table << "] entries=" << _entries + << ", sample_value=" << _sample_val.size() << " B" + << ", approx bytes/put=" << _bytes_per_put << " B"); + LOG_PRINT_L0("[" << _table << "] Approx initial size: " + << (approx_initial_bytes / MB_DEC) << " MB"); + } + + bool write(uint64_t target_mb, uint64_t step_mb) + { + using clock = std::chrono::steady_clock; + + const uint64_t MB_DEC = 1'000'000; + const uint64_t target_bytes = target_mb * MB_DEC; + const uint64_t step_bytes = step_mb * MB_DEC; + + if (!_db.begin_transaction(false)) + { + LOG_ERROR("begin_transaction failed"); + return false; + } + bool tx_open = true; + bool ok = true; + + std::random_device rd; + std::mt19937_64 gen(rd()); + + uint64_t written_total = 0; + uint64_t next_report = step_bytes; + uint64_t chunks_done = 0; + uint64_t counter = 0; + uint64_t array_key = _entries; + + const auto all_start = clock::now(); + auto chunk_start = all_start; + + while (written_total < target_bytes) + { + if (_mode == Mode::kv) + { + std::string new_key; + gen_key_bytes(gen, counter++, new_key, _key_size); + if (!_be_.set(_h, new_key.data(), new_key.size(), _sample_val.data(), _sample_val.size())) + { + LOG_ERROR("[" << _table << "] set(new kv) failed"); + ok = false; break; + } + } + else + { // array + if (!_be_.set(_h, reinterpret_cast(&array_key), sizeof(array_key), + _sample_val.data(), _sample_val.size())) + { + LOG_ERROR("[" << _table << "] set(new array) failed"); + ok = false; break; + } + ++array_key; + } + + written_total += _bytes_per_put; + + if (written_total >= next_report) + { + const auto t_fill_done = clock::now(); + + if (tx_open) + { + _db.commit_transaction(); + tx_open = false; + } + const auto t_after_commit = clock::now(); + + const double fill_sec = std::chrono::duration(t_fill_done - chunk_start).count(); + const double commit_sec = std::chrono::duration(t_after_commit - t_fill_done).count(); + const double chunk_sec = fill_sec + commit_sec; + const double total_sec = std::chrono::duration(t_after_commit - all_start).count(); + + ++chunks_done; + + const long double approx_now_mb = + (static_cast(_entries) * _bytes_per_put + + static_cast(chunks_done) * step_mb * MB_DEC) / 1'000'000.0L; + + const double chunk_mb = static_cast(step_mb); + const double fill_mb_s = chunk_mb / (fill_sec > 0 ? fill_sec : 1e-9); + const double commit_mb_s = chunk_mb / (commit_sec > 0 ? commit_sec : 1e-9); + const double chunk_mb_s = chunk_mb / (chunk_sec > 0 ? chunk_sec : 1e-9); + const double total_written = static_cast(chunks_done * step_mb); + const double total_mb_s = total_written / (total_sec > 0 ? total_sec : 1e-9); + + LOG_PRINT_L0( + "Writed " << step_mb << "MB in " << _table + << " | fill: " << fill_sec << " s (" << fill_mb_s << " MB/s)" + << ", commit: " << commit_sec << " s (" << commit_mb_s << " MB/s)" + << ", total: " << chunk_sec << " s (" << chunk_mb_s << " MB/s). " + << "Total written: " << total_written << " MB, total time: " + << total_sec << " s (" << total_mb_s << " MB/s avg). " + << "Est. table size now: " << approx_now_mb << " MB." + ); + + if (written_total < target_bytes) + { + if (!_db.begin_transaction(false)) + { + LOG_ERROR("begin_transaction failed (after commit)"); + ok = false; break; + } + tx_open = true; + chunk_start = clock::now(); + } + + next_report += step_bytes; + } + } + + if (tx_open) + { + if (ok) _db.commit_transaction(); + else _db.abort_transaction(); + } + return ok; + } + +private: + basic_db_accessor& _db; + i_db_backend& _be_; + std::string _table; + container_handle _h; + Mode _mode; + uint64_t _entries = 0; + size_t _key_size = 0; + uint64_t _bytes_per_put = 0; + std::string _sample_key; + std::string _sample_val; +}; + +static Mode parse_mode(const char* s) +{ + if (!s) + return Mode::kv; + std::string v(s); + std::transform(v.begin(), v.end(), v.begin(), ::tolower); + if (v == "array") + return Mode::array; + return Mode::kv; +} + +int main(int argc, char** argv) +{ + epee::string_tools::set_module_name_and_folder(argv[0]); + epee::log_space::get_set_log_detalisation_level(true, LOG_LEVEL_0); + epee::log_space::log_singletone::add_logger(LOGGER_CONSOLE, nullptr, nullptr, LOG_LEVEL_0); + epee::log_space::log_singletone::set_thread_log_prefix("[db-bulk-writer] "); + + if (argc < 3) + { + LOG_PRINT_L0( + std::string("Usage:\n ") + argv[0] + + " [--mode=kv|array] [--target-mb=N] [--step-mb=M]\n\n" + "Examples:\n " + argv[0] + + " ~/.Zano/blockchain_lmdb_v3 transactions --mode=kv --target-mb=1000 --step-mb=100\n " + + argv[0] + + " ~/.Zano/blockchain_lmdb_v3 blocks --mode=array --target-mb=200 --step-mb=50"); + return 0; + } + + + std::string env_path = argv[1]; + std::string table_name = argv[2]; + Mode mode = Mode::kv; + uint64_t target_mb = 1000; + uint64_t step_mb = 100; + + for (int i = 3; i < argc; ++i) + { + std::string a = argv[i]; + auto eat = [&](const char* pfx)->const char* + { + size_t L = std::strlen(pfx); + if (a.size() > L && a.compare(0,L,pfx) == 0) + return a.c_str()+L; + return nullptr; + }; + if (const char* v = eat("--mode=")) + mode = parse_mode(v); + else if (const char* v = eat("--target-mb=")) + target_mb = std::strtoull(v, nullptr, 10); + else if (const char* v = eat("--step-mb=")) + step_mb = std::strtoull(v, nullptr, 10); + } + + epee::shared_recursive_mutex rwlock; + std::shared_ptr be(new lmdb_db_backend()); + basic_db_accessor db(be, rwlock); + + uint64_t max_db_size = uint64_t(1UL * 512UL) * 1024UL * 1024UL * 1024UL; + if (!db.open(env_path, max_db_size)) + { + LOG_ERROR("Failed to open LMDB env at " << env_path); + return EXIT_FAILURE; + } + + container_handle h = 0; + if (!be->open_container(table_name, h)) + { + LOG_ERROR("open_container(" << table_name << ") failed"); + return EXIT_FAILURE; + } + + bulk_writer writer(db, *be, table_name, h, mode); + if (!writer.init_sample()) + return EXIT_FAILURE; + writer.print_initial_estimate(); + + bool ok = writer.write(target_mb, step_mb); + db.close(); + + if (ok) + { + LOG_PRINT_L0("DONE"); + return EXIT_SUCCESS; + } + else + { + LOG_ERROR("FAILED"); + return EXIT_FAILURE; + } +}