1
0
Fork 0
forked from lthn/blockchain

epee::interruptible_http_client

This commit is contained in:
sowle 2020-03-10 18:03:12 +03:00
parent b4861c2511
commit 945451bc94
No known key found for this signature in database
GPG key ID: C07A24B2D89D49FC

View file

@ -235,8 +235,6 @@ using namespace std;
unsigned int m_recv_timeout;
std::string m_header_cache;
http_response_info m_response_info;
size_t m_len_in_summary;
size_t m_len_in_remain;
//std::string* m_ptarget_buffer;
boost::shared_ptr<i_sub_handler> m_pcontent_encoding_handler;
reciev_machine_state m_state;
@ -244,6 +242,10 @@ using namespace std;
std::string m_chunked_cache;
critical_section m_lock;
protected:
uint64_t m_len_in_summary;
uint64_t m_len_in_remain;
public:
void set_host_name(const std::string& name)
{
@ -865,13 +867,13 @@ using namespace std;
return true;
}
};
// class http_simple_client
/************************************************************************/
/* */
/************************************************************************/
//inline
template<class t_transport>
bool invoke_request(const std::string& url, t_transport& tr, unsigned int timeout, const http_response_info** ppresponse_info, const std::string& method = "GET", const std::string& body = std::string(), const fields_list& additional_params = fields_list())
{
@ -895,6 +897,121 @@ using namespace std;
return tr.invoke(u_c.uri, method, body, ppresponse_info, additional_params);
}
}
}
}
struct idle_handler_base
{
virtual bool do_call(const std::string& piece_of_data, uint64_t total_bytes, uint64_t received_bytes) = 0;
};
template <typename callback_t>
struct idle_handler : public idle_handler_base
{
callback_t m_cb;
idle_handler(callback_t cb) : m_cb(cb) {}
virtual bool do_call(const std::string& piece_of_data, uint64_t total_bytes, uint64_t received_bytes)
{
return m_cb(piece_of_data, total_bytes, received_bytes);
}
};
class interruptible_http_client : public http_simple_client
{
std::shared_ptr<idle_handler_base> m_pcb;
virtual bool handle_target_data(std::string& piece_of_transfer)
{
bool r = m_pcb->do_call(piece_of_transfer, m_len_in_summary, m_len_in_summary - m_len_in_remain);
piece_of_transfer.clear();
return r;
}
public:
template<typename callback_t>
bool invoke_cb(callback_t cb, const std::string& url, uint64_t timeout, const std::string& method = "GET", const std::string& body = std::string(), const fields_list& additional_params = fields_list())
{
m_pcb.reset(new idle_handler<callback_t>(cb));
return invoke_request(url, *this, timeout, nullptr, method, body, additional_params);
}
template<typename callback_t>
bool download(callback_t cb, const std::string& path_for_file, const std::string& url, uint64_t timeout, const std::string& method = "GET", const std::string& body = std::string(), const fields_list& additional_params = fields_list())
{
std::ofstream fs;
fs.open(path_for_file, std::ios::binary | std::ios::out | std::ios::trunc);
if (!fs.is_open())
{
LOG_ERROR("Fsiled to open " << path_for_file);
return false;
}
auto local_cb = [&](const std::string& piece_of_data, uint64_t total_bytes, uint64_t received_bytes)
{
fs.write(piece_of_data.data(), piece_of_data.size());
return cb(total_bytes, received_bytes);
};
bool r = this->invoke_cb(local_cb, url, timeout, method, body, additional_params);
fs.close();
return r;
}
//
template<typename callback_t>
bool download_and_unzip(callback_t cb, const std::string& path_for_file, const std::string& url, uint64_t timeout, const std::string& method = "GET", const std::string& body = std::string(), uint64_t fails_count = 1000, const fields_list& additional_params = fields_list())
{
std::ofstream fs;
fs.open(path_for_file, std::ios::binary | std::ios::out | std::ios::trunc);
if (!fs.is_open())
{
LOG_ERROR("Fsiled to open " << path_for_file);
return false;
}
std::string buff;
gzip_decoder_lambda zip_decoder;
uint64_t state_total_bytes = 0;
uint64_t state_received_bytes_base = 0;
uint64_t state_received_bytes_current = 0;
bool stopped = false;
auto local_cb = [&](const std::string& piece_of_data, uint64_t total_bytes, uint64_t received_bytes)
{
//remember total_bytes only for first attempt, where fetched full lenght of the file
if (!state_total_bytes)
state_total_bytes = total_bytes;
buff += piece_of_data;
return zip_decoder.update_in(buff, [&](const std::string& unpacked_buff)
{
state_received_bytes_current = received_bytes;
fs.write(unpacked_buff.data(), unpacked_buff.size());
stopped = !cb(unpacked_buff, state_total_bytes, state_received_bytes_base + received_bytes);
return !stopped;
});
};
uint64_t current_err_count = 0;
bool r = false;
while (!r || current_err_count > fails_count)
{
LOG_PRINT_L0("Attempt to invoke http: " << url << " (offset:" << state_received_bytes_base << ")");
fields_list additional_params_local = additional_params;
additional_params_local.push_back(std::make_pair<std::string, std::string>("Range", std::string("bytes=") + std::to_string(state_received_bytes_base) + "-"));
r = this->invoke_cb(local_cb, url, timeout, method, body, additional_params_local);
if (!r)
{
if (stopped)
break;
current_err_count++;
state_received_bytes_base += state_received_bytes_current;
state_received_bytes_current = 0;
boost::this_thread::sleep_for(boost::chrono::milliseconds(500));
}
}
fs.close();
return r;
}
};
} // namespace http
} // namespace net_utils
} // namespace epee