// Copyright (c) 2014-2018 Zano Project // Copyright (c) 2014-2018 The Louisdor Project // Copyright (c) 2012-2013 The Cryptonote developers // Distributed under the MIT/X11 software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include "currency_core/currency_format_utils.h" #include "profile_tools.h" namespace currency { //----------------------------------------------------------------------------------------------------------------------- template t_currency_protocol_handler::t_currency_protocol_handler(t_core& rcore, nodetool::i_p2p_endpoint* p_net_layout) : m_core(rcore) , m_p2p(p_net_layout) , m_syncronized_connections_count(0) , m_synchronized(false) , m_have_been_synchronized(false) , m_max_height_seen(0) , m_core_inital_height(0) , m_want_stop(false) , m_last_median2local_time_difference(0) , m_last_ntp2local_time_difference(0) { if(!m_p2p) m_p2p = &m_p2p_stub; } //----------------------------------------------------------------------------------------------------------------------- template t_currency_protocol_handler::~t_currency_protocol_handler() { deinit(); } //----------------------------------------------------------------------------------------------------------------------- template bool t_currency_protocol_handler::init(const boost::program_options::variables_map& vm) { m_relay_que_thread = std::thread([this](){relay_que_worker();}); return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::deinit() { m_want_stop = true; m_relay_que_cv.notify_all(); if (m_relay_que_thread.joinable()) m_relay_que_thread.join(); return true; } //------------------------------------------------------------------------------------------------------------------------ template void t_currency_protocol_handler::set_p2p_endpoint(nodetool::i_p2p_endpoint* p2p) { if(p2p) m_p2p = p2p; else m_p2p = &m_p2p_stub; } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::on_callback(currency_connection_context& context) { LOG_PRINT_L3("callback fired"); CHECK_AND_ASSERT_MES_CC( context.m_priv.m_callback_request_count > 0, false, "false callback fired, but context.m_priv.m_callback_request_count=" << context.m_priv.m_callback_request_count); --context.m_priv.m_callback_request_count; if(context.m_state == currency_connection_context::state_synchronizing) { NOTIFY_REQUEST_CHAIN::request r = boost::value_initialized(); m_core.get_short_chain_history(r.block_ids); LOG_PRINT_L2("[NOTIFY]NOTIFY_REQUEST_CHAIN(on_callback): m_block_ids.size()=" << r.block_ids.size()); LOG_PRINT_L3("[NOTIFY]NOTIFY_REQUEST_CHAIN(on_callback): " << ENDL << print_kv_structure(r)); post_notify(r, context); } return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::get_stat_info(const core_stat_info::params& pr, core_stat_info& stat_inf) { return m_core.get_stat_info(pr, stat_inf); } //------------------------------------------------------------------------------------------------------------------------ template void t_currency_protocol_handler::log_connections() { std::stringstream ss; ss << std::setw(29) << std::left << "Remote Host" << std::setw(20) << "Peer id" << std::setw(25) << "Recv/Sent (idle,sec)" << std::setw(25) << "State" << std::setw(20) << "Livetime(seconds)" << std::setw(20) << "Client version" << ENDL; m_p2p->for_each_connection([&](const connection_context& cntxt, nodetool::peerid_type peer_id) { ss << std::setw(29) << std::left << std::string(cntxt.m_is_income ? "[INC]":"[OUT]") + string_tools::get_ip_string_from_int32(cntxt.m_remote_ip) + ":" + std::to_string(cntxt.m_remote_port) << std::setw(20) << std::hex << peer_id << std::setw(25) << std::to_string(cntxt.m_recv_cnt)+ "(" + std::to_string(time(NULL) - cntxt.m_last_recv) + ")" + "/" + std::to_string(cntxt.m_send_cnt) + "(" + std::to_string(time(NULL) - cntxt.m_last_send) + ")" << std::setw(25) << get_protocol_state_string(cntxt.m_state) << std::setw(20) << std::to_string(time(NULL) - cntxt.m_started) << std::setw(20) << cntxt.m_remote_version << ENDL; return true; }); LOG_PRINT_L0("Connections: " << ENDL << ss.str()); } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::process_payload_sync_data(const CORE_SYNC_DATA& hshd, currency_connection_context& context, bool is_inital) { context.m_remote_version = hshd.client_version; if(context.m_state == currency_connection_context::state_befor_handshake && !is_inital) return true; uint64_t local_time = m_core.get_blockchain_storage().get_core_runtime_config().get_core_time(); context.m_time_delta = local_time - hshd.core_time; // for outgoing connections -- check time difference if (!context.m_is_income) { if (!add_time_delta_and_check_time_sync(context.m_time_delta)) { // serious time sync problem detected i_critical_error_handler* ceh(m_core.get_critical_error_handler()); if (ceh != nullptr && ceh->on_critical_time_sync_error()) { // error is handled by a callee, should not be ignored here, stop processing immideately return true; } } } if(context.m_state == currency_connection_context::state_synchronizing) return true; bool have_in_que = false; CRITICAL_REGION_BEGIN(m_blocks_id_que_lock); have_in_que = m_blocks_id_que.find(hshd.top_id) != m_blocks_id_que.end(); CRITICAL_REGION_END(); if(have_in_que || m_core.have_block(hshd.top_id)) { context.m_state = currency_connection_context::state_normal; if(is_inital) on_connection_synchronized(); return true; } int64_t diff = static_cast(hshd.current_height) - static_cast(m_core.get_current_blockchain_size()); LOG_PRINT_COLOR2(LOG_DEFAULT_TARGET, (is_inital ? "Inital ":"Idle ") << "sync data returned unknown top block (" << hshd.top_id << "): " << m_core.get_top_block_height() << " -> " << hshd.current_height - 1 << " [" << std::abs(diff) << " blocks (" << diff / (24 * 60 * 60 / DIFFICULTY_TOTAL_TARGET ) << " days) " << (0 <= diff ? std::string("behind") : std::string("ahead")) << "] " << ENDL << "SYNCHRONIZATION started", (is_inital ? LOG_LEVEL_0 : LOG_LEVEL_1), (is_inital ? epee::log_space::console_color_yellow : epee::log_space::console_color_magenta)); LOG_PRINT_L1("Remote top block height: " << hshd.current_height << ", id: " << hshd.top_id); /*check if current height is in remote's checkpoints zone*/ if(hshd.last_checkpoint_height && m_core.get_blockchain_storage().get_checkpoints().get_top_checkpoint_height() < hshd.last_checkpoint_height && m_core.get_current_blockchain_size() < hshd.last_checkpoint_height ) { LOG_PRINT_RED("Remote node have longer checkpoints zone( " << hshd.last_checkpoint_height << ") " << "that local (" << m_core.get_blockchain_storage().get_checkpoints().get_top_checkpoint_height() << ")" << "That means that current software is outdated, please updated it." << "Current heigh lay under checkpoints on remote host, so it is not possible validate this transactions on local host, disconnecting.", LOG_LEVEL_0); return false; } else if (m_core.get_blockchain_storage().get_checkpoints().get_top_checkpoint_height() < hshd.last_checkpoint_height) { LOG_PRINT_MAGENTA("Remote node have longer checkpoints zone( " << hshd.last_checkpoint_height << ") " << "that local (" << m_core.get_blockchain_storage().get_checkpoints().get_top_checkpoint_height() << ")" << "That means that current software is outdated, please updated it.", LOG_LEVEL_0); } context.m_state = currency_connection_context::state_synchronizing; context.m_remote_blockchain_height = hshd.current_height; //let the socket to send response to handshake, but request callback, to let send request data after response LOG_PRINT_L3("requesting callback"); ++context.m_priv.m_callback_request_count; m_p2p->request_callback(context); //update progress vars if (m_max_height_seen < hshd.current_height) m_max_height_seen = hshd.current_height; if (!m_core_inital_height) m_core_inital_height = m_core.get_current_blockchain_size(); return true; } //------------------------------------------------------------------------------------------------------------------------ template uint64_t t_currency_protocol_handler::get_core_inital_height() { return m_core_inital_height; } //------------------------------------------------------------------------------------------------------------------------ template uint64_t t_currency_protocol_handler::get_max_seen_height() { return m_max_height_seen; } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::get_payload_sync_data(CORE_SYNC_DATA& hshd) { m_core.get_blockchain_top(hshd.current_height, hshd.top_id); hshd.current_height +=1; hshd.last_checkpoint_height = m_core.get_blockchain_storage().get_checkpoints().get_top_checkpoint_height(); hshd.core_time = m_core.get_blockchain_storage().get_core_runtime_config().get_core_time(); hshd.client_version = PROJECT_VERSION_LONG; return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::get_payload_sync_data(blobdata& data) { CORE_SYNC_DATA hsd = AUTO_VAL_INIT(hsd); get_payload_sync_data(hsd); epee::serialization::store_t_to_binary(hsd, data); return true; } //------------------------------------------------------------------------------------------------------------------------ template int t_currency_protocol_handler::handle_notify_new_block(int command, NOTIFY_NEW_BLOCK::request& arg, currency_connection_context& context) { if(context.m_state != currency_connection_context::state_normal) return 1; //check if block already exists block b = AUTO_VAL_INIT(b); block_verification_context bvc = AUTO_VAL_INIT(bvc); if (!m_core.parse_block(arg.b.block, b, bvc)) { LOG_PRINT_RED("Block parsing failed, dropping connection", LOG_LEVEL_0); m_p2p->drop_connection(context); return 1; } crypto::hash block_id = get_block_hash(b); LOG_PRINT_GREEN("[HANDLE]NOTIFY_NEW_BLOCK " << block_id << " HEIGHT " << get_block_height(b) << " (hop " << arg.hop << ")", LOG_LEVEL_2); CRITICAL_REGION_BEGIN(m_blocks_id_que_lock); auto it = m_blocks_id_que.find(block_id); if (it != m_blocks_id_que.end()) { //already have this block handler in que LOG_PRINT("Block " << block_id << " already in processing que", LOG_LEVEL_3); return 1; } else { m_blocks_id_que.insert(block_id); } CRITICAL_REGION_END(); auto slh = epee::misc_utils::create_scope_leave_handler([&]() { CRITICAL_REGION_LOCAL(m_blocks_id_que_lock); auto it = m_blocks_id_que.find(block_id); CHECK_AND_ASSERT_MES_NO_RET(it != m_blocks_id_que.end(), "Internal error, block " << block_id << " not found in m_blocks_id_que"); m_blocks_id_que.erase(it); }); if (m_core.have_block(block_id)) { LOG_PRINT_L3("Block " << block_id << " already in core"); return 1; } //pre-validate block here, and propagate it to network asap to avoid latency of handling big block (tx flood) //######################################################## /* problem with prevalidation: in case of pre_validate_block() is passed but handle_incoming_tx() is failed network got spammed with notifications about this broken block and then connections got closed. temporary disabled to more investigation bool prevalidate_relayed = false; if (m_core.pre_validate_block(b, bvc, block_id) && bvc.m_added_to_main_chain) { //not alternative block, relay it ++arg.hop; relay_block(arg, context); prevalidate_relayed = true; } */ //######################################################## //now actually process block for(auto tx_blob_it = arg.b.txs.begin(); tx_blob_it!=arg.b.txs.end();tx_blob_it++) { currency::tx_verification_context tvc = AUTO_VAL_INIT(tvc); m_core.handle_incoming_tx(*tx_blob_it, tvc, true); if(tvc.m_verification_failed) { LOG_PRINT_L0("Block verification failed: transaction verification failed, dropping connection"); m_p2p->drop_connection(context); return 1; } } m_core.pause_mine(); m_core.handle_incoming_block(b, bvc); m_core.resume_mine(); if(bvc.m_verification_failed) { LOG_PRINT_L0("Block verification failed, dropping connection"); m_p2p->drop_connection(context); return 1; } LOG_PRINT_GREEN("[HANDLE]NOTIFY_NEW_BLOCK EXTRA: id: " << block_id << ",bvc.m_added_to_main_chain " << bvc.m_added_to_main_chain //<< ",prevalidate_result " << prevalidate_relayed << ",bvc.added_to_altchain " << bvc.added_to_altchain << ",bvc.m_marked_as_orphaned " << bvc.m_marked_as_orphaned, LOG_LEVEL_2); if (bvc.m_added_to_main_chain || (bvc.added_to_altchain && bvc.height_difference < 2)) { if (true/*!prevalidate_relayed*/) { // pre-validation failed prevoiusly, but complete check was success, not an alternative block ++arg.hop; //TODO: Add here announce protocol usage relay_block(arg, context); } }else if(bvc.m_marked_as_orphaned) { context.m_state = currency_connection_context::state_synchronizing; NOTIFY_REQUEST_CHAIN::request r = boost::value_initialized(); m_core.get_short_chain_history(r.block_ids); LOG_PRINT_MAGENTA("State changed to state_synchronizing.", LOG_LEVEL_2); LOG_PRINT_L2("[NOTIFY]NOTIFY_REQUEST_CHAIN(on_orphaned): m_block_ids.size()=" << r.block_ids.size() ); LOG_PRINT_L3("[NOTIFY]NOTIFY_REQUEST_CHAIN(on_orphaned): " << ENDL << print_kv_structure(r)); post_notify(r, context); } return 1; } //------------------------------------------------------------------------------------------------------------------------ template int t_currency_protocol_handler::handle_notify_new_transactions(int command, NOTIFY_NEW_TRANSACTIONS::request& arg, currency_connection_context& context) { if(context.m_state != currency_connection_context::state_normal) return 1; uint64_t inital_tx_count = arg.txs.size(); TIME_MEASURE_START_MS(new_transactions_handle_time); for(auto tx_blob_it = arg.txs.begin(); tx_blob_it!=arg.txs.end();) { currency::tx_verification_context tvc = AUTO_VAL_INIT(tvc); m_core.handle_incoming_tx(*tx_blob_it, tvc, false); if(tvc.m_verification_failed) { LOG_PRINT_L0("NOTIFY_NEW_TRANSACTIONS: Tx verification failed, dropping connection"); m_p2p->drop_connection(context); return 1; } if(tvc.m_should_be_relayed) ++tx_blob_it; else arg.txs.erase(tx_blob_it++); } if(arg.txs.size()) { //TODO: add announce usage here relay_transactions(arg, context); } TIME_MEASURE_FINISH_MS(new_transactions_handle_time); LOG_PRINT_L2("NOTIFY_NEW_TRANSACTIONS: " << new_transactions_handle_time << "ms (inital_tx_count: " << inital_tx_count << ", relayed_tx_count: " << arg.txs.size() << ")"); return true; } //------------------------------------------------------------------------------------------------------------------------ template int t_currency_protocol_handler::handle_request_get_objects(int command, NOTIFY_REQUEST_GET_OBJECTS::request& arg, currency_connection_context& context) { LOG_PRINT_L2("[HANDLE]NOTIFY_REQUEST_GET_OBJECTS: arg.blocks.size() = " << arg.blocks.size() << ", arg.txs.size()="<< arg.txs.size()); LOG_PRINT_L3("[HANDLE]NOTIFY_REQUEST_GET_OBJECTS: " << ENDL << currency::print_kv_structure(arg)); if (arg.blocks.size() > CURRENCY_PROTOCOL_MAX_BLOCKS_REQUEST_COUNT || arg.txs.size() > CURRENCY_PROTOCOL_MAX_TXS_REQUEST_COUNT) { LOG_ERROR_CCONTEXT("Requested objects count is to big (" << arg.blocks.size() <<")expected not more then " << CURRENCY_PROTOCOL_MAX_BLOCKS_REQUEST_COUNT); m_p2p->drop_connection(context); } NOTIFY_RESPONSE_GET_OBJECTS::request rsp; if(!m_core.handle_get_objects(arg, rsp, context)) { LOG_ERROR_CCONTEXT("failed to handle request NOTIFY_REQUEST_GET_OBJECTS, dropping connection"); m_p2p->drop_connection(context); } LOG_PRINT_L2("[NOTIFY]NOTIFY_RESPONSE_GET_OBJECTS: blocks.size()=" << rsp.blocks.size() << ", txs.size()=" << rsp.txs.size() << ", rsp.m_current_blockchain_height=" << rsp.current_blockchain_height << ", missed_ids.size()=" << rsp.missed_ids.size()); LOG_PRINT_L3("[NOTIFY]NOTIFY_RESPONSE_GET_OBJECTS: " << ENDL << currency::print_kv_structure(rsp)); post_notify(rsp, context); return 1; } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::check_stop_flag_and_drop_cc(currency_connection_context& context) { if (m_p2p->is_stop_signal_sent() || m_want_stop) { m_p2p->drop_connection(context); return true; } return false; } #define CHECK_STOP_FLAG__DROP_AND_RETURN_IF_SET(ret_v, msg) if (check_stop_flag_and_drop_cc(context)) { LOG_PRINT_YELLOW("Stop flag detected within NOTIFY_RESPONSE_GET_OBJECTS. " << msg, LOG_LEVEL_0); return ret_v; } //------------------------------------------------------------------------------------------------------------------------ template int t_currency_protocol_handler::handle_response_get_objects(int command, NOTIFY_RESPONSE_GET_OBJECTS::request& arg, currency_connection_context& context) { LOG_PRINT_L2("[HANDLE]NOTIFY_RESPONSE_GET_OBJECTS: arg.blocks.size()=" << arg.blocks.size() << ", arg.missed_ids.size()=" << arg.missed_ids.size() << ", arg.txs.size()=" << arg.txs.size()); LOG_PRINT_L3("[HANDLE]NOTIFY_RESPONSE_GET_OBJECTS: " << ENDL << currency::print_kv_structure(arg)); if(context.m_last_response_height > arg.current_blockchain_height) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_HAVE_OBJECTS: arg.m_current_blockchain_height=" << arg.current_blockchain_height << " < m_last_response_height=" << context.m_last_response_height << ", dropping connection"); m_p2p->drop_connection(context); return 1; } context.m_remote_blockchain_height = arg.current_blockchain_height; uint64_t total_blocks_parsing_time = 0; size_t count = 0; for (const block_complete_entry& block_entry : arg.blocks) { CHECK_STOP_FLAG__DROP_AND_RETURN_IF_SET(1, "Blocks processing interrupted, connection dropped"); ++count; block b; TIME_MEASURE_START(block_parsing_time); if(!parse_and_validate_block_from_blob(block_entry.block, b)) { LOG_ERROR_CCONTEXT("sent wrong block: failed to parse and validate block: \r\n" << string_tools::buff_to_hex_nodelimer(block_entry.block) << "\r\n dropping connection"); m_p2p->drop_connection(context); m_p2p->add_ip_fail(context.m_remote_ip); return 1; } TIME_MEASURE_FINISH(block_parsing_time); total_blocks_parsing_time += block_parsing_time; //to avoid concurrency in core between connections, suspend connections which delivered block later then first one if(count == 2) { if(m_core.have_block(get_block_hash(b))) { context.m_state = currency_connection_context::state_idle; context.m_priv.m_needed_objects.clear(); context.m_priv.m_requested_objects.clear(); LOG_PRINT_L1("Connection set to idle state."); return 1; } } auto req_it = context.m_priv.m_requested_objects.find(get_block_hash(b)); if(req_it == context.m_priv.m_requested_objects.end()) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_RESPONSE_GET_OBJECTS: block with id=" << string_tools::pod_to_hex(get_blob_hash(block_entry.block)) << " wasn't requested, dropping connection"); m_p2p->drop_connection(context); return 1; } if(b.tx_hashes.size() != block_entry.txs.size()) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_RESPONSE_GET_OBJECTS: block with id=" << string_tools::pod_to_hex(get_blob_hash(block_entry.block)) << ", tx_hashes.size()=" << b.tx_hashes.size() << " mismatch with block_complete_entry.m_txs.size()=" << block_entry.txs.size() << ", dropping connection"); m_p2p->drop_connection(context); return 1; } context.m_priv.m_requested_objects.erase(req_it); } LOG_PRINT_CYAN("Block parsing time avr: " << (count > 0 ? total_blocks_parsing_time / count : 0) << " mcs, total for " << count << " blocks: " << total_blocks_parsing_time / 1000 << " ms", LOG_LEVEL_2); if(context.m_priv.m_requested_objects.size()) { LOG_PRINT_RED("returned not all requested objects (context.m_priv.m_requested_objects.size()=" << context.m_priv.m_requested_objects.size() << "), dropping connection", LOG_LEVEL_0); m_p2p->drop_connection(context); return 1; } { m_core.pause_mine(); misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( boost::bind(&t_core::resume_mine, &m_core)); size_t count = 0; for (const block_complete_entry& block_entry : arg.blocks) { CHECK_STOP_FLAG__DROP_AND_RETURN_IF_SET(1, "Blocks processing interrupted, connection dropped"); //process transactions TIME_MEASURE_START(transactions_process_time); for (const auto& tx_blob : block_entry.txs) { CHECK_STOP_FLAG__DROP_AND_RETURN_IF_SET(1, "Block txs processing interrupted, connection dropped"); tx_verification_context tvc = AUTO_VAL_INIT(tvc); m_core.handle_incoming_tx(tx_blob, tvc, true); if(tvc.m_verification_failed) { LOG_ERROR_CCONTEXT("transaction verification failed on NOTIFY_RESPONSE_GET_OBJECTS, \r\ntx_id = " << string_tools::pod_to_hex(get_blob_hash(tx_blob)) << ", dropping connection"); m_p2p->drop_connection(context); return 1; } } TIME_MEASURE_FINISH(transactions_process_time); //process block TIME_MEASURE_START(block_process_time); block_verification_context bvc = boost::value_initialized(); m_core.handle_incoming_block(block_entry.block, bvc, false); if (count > 2 && bvc.m_already_exists) { context.m_state = currency_connection_context::state_idle; context.m_priv.m_needed_objects.clear(); context.m_priv.m_requested_objects.clear(); LOG_PRINT_L1("Connection set to idle state."); return 1; } if(bvc.m_verification_failed) { LOG_PRINT_L0("Block verification failed, dropping connection"); m_p2p->drop_connection(context); m_p2p->add_ip_fail(context.m_remote_ip); return 1; } if(bvc.m_marked_as_orphaned) { LOG_PRINT_L0("Block received at sync phase was marked as orphaned, dropping connection, details on response: " << ENDL << print_kv_structure(arg)); m_p2p->drop_connection(context); m_p2p->add_ip_fail(context.m_remote_ip); return 1; } TIME_MEASURE_FINISH(block_process_time); LOG_PRINT_L2("Block process time: " << block_process_time + transactions_process_time << "(" << transactions_process_time << "/" << block_process_time << ")ms"); ++count; } } uint64_t current_size = m_core.get_blockchain_storage().get_current_blockchain_size(); LOG_PRINT_YELLOW(">>>>>>>>> sync progress: " << arg.blocks.size() << " blocks added, now have " << current_size << " of " << context.m_remote_blockchain_height << " ( " << std::fixed << std::setprecision(2) << current_size * 100.0 / context.m_remote_blockchain_height << "% ) and " << context.m_remote_blockchain_height - current_size << " blocks left" , LOG_LEVEL_0); request_missing_objects(context, true); return 1; } #undef CHECK_STOP_FLAG__DROP_AND_RETURN_IF_SET //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::on_idle() { bool have_synced_conn = false; m_p2p->for_each_connection([&](currency_connection_context& context, nodetool::peerid_type peer_id)->bool{ if (context.m_state == currency_connection_context::state_normal) { have_synced_conn = true; return false; } return true; }); if (have_synced_conn && !m_synchronized) { on_connection_synchronized(); m_synchronized = true; LOG_PRINT_MAGENTA("Synchronized set to TRUE (idle)", LOG_LEVEL_0); } else if (!have_synced_conn && m_synchronized) { LOG_PRINT_MAGENTA("Synchronized set to FALSE (idle)", LOG_LEVEL_0); m_synchronized = false; } return m_core.on_idle(); } //------------------------------------------------------------------------------------------------------------------------ template int t_currency_protocol_handler::handle_request_chain(int command, NOTIFY_REQUEST_CHAIN::request& arg, currency_connection_context& context) { LOG_PRINT_L2("[HANDLE]NOTIFY_REQUEST_CHAIN: block_ids.size()=" << arg.block_ids.size()); LOG_PRINT_L3("[HANDLE]NOTIFY_REQUEST_CHAIN: " << print_kv_structure(arg)); NOTIFY_RESPONSE_CHAIN_ENTRY::request r; if(!m_core.find_blockchain_supplement(arg.block_ids, r)) { LOG_ERROR_CCONTEXT("Failed to handle NOTIFY_REQUEST_CHAIN."); return 1; } LOG_PRINT_L2("[NOTIFY]NOTIFY_RESPONSE_CHAIN_ENTRY: m_start_height=" << r.start_height << ", m_total_height=" << r.total_height << ", m_block_ids.size()=" << r.m_block_ids.size()); LOG_PRINT_L3("[NOTIFY]NOTIFY_RESPONSE_CHAIN_ENTRY: " << print_kv_structure(r)); post_notify(r, context); return 1; } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::request_missing_objects(currency_connection_context& context, bool check_having_blocks) { if(context.m_priv.m_needed_objects.size()) { //we know objects that we need, request this objects NOTIFY_REQUEST_GET_OBJECTS::request req; size_t count = 0; auto it = context.m_priv.m_needed_objects.begin(); uint64_t requested_cumulative_size = 0; while (it != context.m_priv.m_needed_objects.end() && count < BLOCKS_SYNCHRONIZING_DEFAULT_COUNT && requested_cumulative_size < BLOCKS_SYNCHRONIZING_DEFAULT_SIZE) { if( !(check_having_blocks && m_core.have_block(it->h))) { req.blocks.push_back(it->h); requested_cumulative_size += it->cumul_size; ++count; context.m_priv.m_requested_objects.insert(it->h); } context.m_priv.m_needed_objects.erase(it++); } LOG_PRINT_L2("[NOTIFY]NOTIFY_REQUEST_GET_OBJECTS(req_missing): requested_cumulative_size=" << requested_cumulative_size << ", blocks.size()=" << req.blocks.size() << ", txs.size()=" << req.txs.size()); LOG_PRINT_L3("[NOTIFY]NOTIFY_REQUEST_GET_OBJECTS(req_missing): " << ENDL << currency::print_kv_structure(req)); post_notify(req, context); }else if(context.m_last_response_height < context.m_remote_blockchain_height-1) {//we have to fetch more objects ids, request blockchain entry NOTIFY_REQUEST_CHAIN::request r = boost::value_initialized(); m_core.get_short_chain_history(r.block_ids); LOG_PRINT_L2("[NOTIFY]NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); LOG_PRINT_L3("[NOTIFY]NOTIFY_REQUEST_CHAIN: " << ENDL << print_kv_structure(r) ); post_notify(r, context); }else { CHECK_AND_ASSERT_MES(context.m_last_response_height == context.m_remote_blockchain_height-1 && !context.m_priv.m_needed_objects.size() && !context.m_priv.m_requested_objects.size(), false, "request_missing_blocks final condition failed!" << "\r\nm_last_response_height=" << context.m_last_response_height << "\r\nm_remote_blockchain_height=" << context.m_remote_blockchain_height << "\r\nm_needed_objects.size()=" << context.m_priv.m_needed_objects.size() << "\r\nm_requested_objects.size()=" << context.m_priv.m_requested_objects.size() << "\r\non connection [" << net_utils::print_connection_context_short(context)<< "]"); context.m_state = currency_connection_context::state_normal; LOG_PRINT_GREEN("[REQUEST_MISSING_OBJECTS]: SYNCHRONIZED OK", LOG_LEVEL_0); on_connection_synchronized(); } return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::on_connection_synchronized() { bool val_expected = false; if (m_have_been_synchronized.compare_exchange_strong(val_expected, true)) { m_core.on_synchronized(); } return true; } //------------------------------------------------------------------------------------------------------------------------ template void t_currency_protocol_handler::relay_que_worker() { while (!m_want_stop) { std::list local_que; { CRITICAL_REGION_LOCAL(m_relay_que_lock); local_que.swap(m_relay_que); } if (local_que.size()) process_current_relay_que(local_que); epee::misc_utils::sleep_no_w(500); } } //------------------------------------------------------------------------------------------------------------------------ template void t_currency_protocol_handler::process_current_relay_que(const std::list& que) { if (que.size() > 1){LOG_PRINT_MAGENTA("RELAY_QUE: " << que.size(), LOG_LEVEL_0);} TIME_MEASURE_START_MS(ms); std::stringstream debug_ss; std::list connections; m_p2p->get_connections(connections); for (auto& cc : connections) { NOTIFY_NEW_TRANSACTIONS::request req = AUTO_VAL_INIT(req); for (auto& qe : que) { //exclude relaying to original sender if (qe.second.m_connection_id == cc.m_connection_id) continue; req.txs.insert(req.txs.begin(), qe.first.txs.begin(), qe.first.txs.end()); } if (req.txs.size()) { post_notify(req, cc); print_connection_context_short(cc, debug_ss); debug_ss << ": " << req.txs.size() << ENDL; } } TIME_MEASURE_FINISH_MS(ms); LOG_PRINT_GREEN("[POST RELAY] NOTIFY_NEW_TRANSACTIONS relayed (" << ms << "ms)contexts list: " << debug_ss.str(), LOG_LEVEL_2); } //------------------------------------------------------------------------------------------------------------------------ template size_t t_currency_protocol_handler::get_synchronized_connections_count() { size_t count = 0; m_p2p->for_each_connection([&](currency_connection_context& context, nodetool::peerid_type peer_id)->bool{ if (context.m_state == currency_connection_context::state_normal) ++count; return true; }); return count; } //------------------------------------------------------------------------------------------------------------------------ template size_t t_currency_protocol_handler::get_synchronizing_connections_count() { size_t count = 0; m_p2p->for_each_connection([&](currency_connection_context& context, nodetool::peerid_type peer_id)->bool{ if (context.m_state == currency_connection_context::state_synchronizing) ++count; return true; }); return count; } //------------------------------------------------------------------------------------------------------------------------ template int64_t t_currency_protocol_handler::get_net_time_delta_median() { std::vector deltas; m_p2p->for_each_connection([&](currency_connection_context& context, nodetool::peerid_type peer_id)->bool{ deltas.push_back(context.m_time_delta); return true; }); return epee::misc_utils::median(deltas); } //------------------------------------------------------------------------------------------------------------------------ #define TIME_SYNC_DELTA_RING_BUFFER_SIZE 8 #define TIME_SYNC_DELTA_TO_LOCAL_MAX_DIFFERENCE (60 * 5) // max acceptable difference between time delta median among peers and local time (seconds) #define TIME_SYNC_NTP_TO_LOCAL_MAX_DIFFERENCE (60 * 5) // max acceptable difference between NTP time and local time (seconds) template bool t_currency_protocol_handler::add_time_delta_and_check_time_sync(int64_t time_delta) { CRITICAL_REGION_LOCAL(m_time_deltas_lock); auto get_core_time = [this] { return m_core.get_blockchain_storage().get_core_runtime_config().get_core_time(); }; m_time_deltas.push_back(time_delta); while (m_time_deltas.size() > TIME_SYNC_DELTA_RING_BUFFER_SIZE) m_time_deltas.pop_front(); if (m_time_deltas.size() < TIME_SYNC_DELTA_RING_BUFFER_SIZE) return true; // not enough data std::vector time_deltas_copy(m_time_deltas.begin(), m_time_deltas.end()); m_last_median2local_time_difference = epee::misc_utils::median(time_deltas_copy); LOG_PRINT_MAGENTA("TIME: network time difference is " << m_last_median2local_time_difference << " (max is " << TIME_SYNC_DELTA_TO_LOCAL_MAX_DIFFERENCE << ")", m_last_median2local_time_difference >= 3 ? LOG_LEVEL_2 : LOG_LEVEL_3); if (std::abs(m_last_median2local_time_difference) > TIME_SYNC_DELTA_TO_LOCAL_MAX_DIFFERENCE) { int64_t ntp_time = tools::get_ntp_time(); LOG_PRINT_L2("NTP: received time " << ntp_time << " (" << epee::misc_utils::get_time_str_v2(ntp_time) << "), diff: " << std::showpos << get_core_time() - ntp_time); if (ntp_time == 0) { // error geting ntp time LOG_PRINT_RED("TIME: network time difference is " << m_last_median2local_time_difference << " (max is " << TIME_SYNC_DELTA_TO_LOCAL_MAX_DIFFERENCE << ") but NTP servers did not respond", LOG_LEVEL_0); return false; } // got ntp time correctly // update local time, because getting ntp time could be time consuming uint64_t local_time_2 = get_core_time(); m_last_ntp2local_time_difference = local_time_2 - ntp_time; if (std::abs(m_last_ntp2local_time_difference) > TIME_SYNC_NTP_TO_LOCAL_MAX_DIFFERENCE) { // local time is out of sync LOG_PRINT_RED("TIME: network time difference is " << m_last_median2local_time_difference << " (max is " << TIME_SYNC_DELTA_TO_LOCAL_MAX_DIFFERENCE << "), NTP time difference is " << m_last_ntp2local_time_difference << " (max is " << TIME_SYNC_NTP_TO_LOCAL_MAX_DIFFERENCE << ")", LOG_LEVEL_0); return false; } // NTP time is OK LOG_PRINT_YELLOW("TIME: network time difference is " << m_last_median2local_time_difference << " (max is " << TIME_SYNC_DELTA_TO_LOCAL_MAX_DIFFERENCE << "), NTP time difference is " << m_last_ntp2local_time_difference << " (max is " << TIME_SYNC_NTP_TO_LOCAL_MAX_DIFFERENCE << ")", LOG_LEVEL_1); } return true; } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::get_last_time_sync_difference(int64_t& last_median2local_time_difference, int64_t& last_ntp2local_time_difference) { CRITICAL_REGION_LOCAL(m_time_deltas_lock); last_median2local_time_difference = m_last_median2local_time_difference; last_ntp2local_time_difference = m_last_ntp2local_time_difference; return !(std::abs(m_last_median2local_time_difference) > TIME_SYNC_DELTA_TO_LOCAL_MAX_DIFFERENCE && std::abs(m_last_ntp2local_time_difference) > TIME_SYNC_NTP_TO_LOCAL_MAX_DIFFERENCE); } //------------------------------------------------------------------------------------------------------------------------ template int t_currency_protocol_handler::handle_response_chain_entry(int command, NOTIFY_RESPONSE_CHAIN_ENTRY::request& arg, currency_connection_context& context) { LOG_PRINT_L2("[HANDLE]NOTIFY_RESPONSE_CHAIN_ENTRY: m_block_ids.size()=" << arg.m_block_ids.size() << ", m_start_height=" << arg.start_height << ", m_total_height=" << arg.total_height); LOG_PRINT_L3("[HANDLE]NOTIFY_RESPONSE_CHAIN_ENTRY: " << ENDL << currency::print_kv_structure(arg)); if(!arg.m_block_ids.size()) { LOG_ERROR_CCONTEXT("sent empty m_block_ids, dropping connection"); m_p2p->drop_connection(context); m_p2p->add_ip_fail(context.m_remote_ip); return 1; } if(!m_core.have_block(arg.m_block_ids.front().h)) { LOG_ERROR_CCONTEXT("sent m_block_ids starting from unknown id: " << string_tools::pod_to_hex(arg.m_block_ids.front()) << " , dropping connection"); m_p2p->drop_connection(context); m_p2p->add_ip_fail(context.m_remote_ip); return 1; } context.m_remote_blockchain_height = arg.total_height; context.m_last_response_height = arg.start_height + arg.m_block_ids.size()-1; if(context.m_last_response_height > context.m_remote_blockchain_height) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_RESPONSE_CHAIN_ENTRY, with \r\nm_total_height=" << arg.total_height << "\r\nm_start_height=" << arg.start_height << "\r\nm_block_ids.size()=" << arg.m_block_ids.size()); //m_p2p->drop_connection(context); } BOOST_FOREACH(auto& bl_details, arg.m_block_ids) { if (!m_core.have_block(bl_details.h)) context.m_priv.m_needed_objects.push_back(bl_details); } request_missing_objects(context, false); return 1; } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::relay_block(NOTIFY_NEW_BLOCK::request& arg, currency_connection_context& exclude_context) { return relay_post_notify(arg, exclude_context); } //------------------------------------------------------------------------------------------------------------------------ template bool t_currency_protocol_handler::relay_transactions(NOTIFY_NEW_TRANSACTIONS::request& arg, currency_connection_context& exclude_context) { #ifdef ASYNC_RELAY_MODE { CRITICAL_REGION_LOCAL(m_relay_que_lock); m_relay_que.push_back(AUTO_VAL_INIT(relay_que_entry())); m_relay_que.back().first = arg; m_relay_que.back().second = exclude_context; } //m_relay_que_cv.notify_all(); return true; #else return relay_post_notify(arg, exclude_context); #endif } }