From 4db26f48a5f5083846cc7f505e0d1b613ab33540 Mon Sep 17 00:00:00 2001 From: cryptozoidberg Date: Sat, 18 May 2019 19:41:17 +0200 Subject: [PATCH] fixed latest boost compatibility --- .../epee/include/net/abstract_tcp_server2.h | 409 ++--- .../epee/include/net/abstract_tcp_server2.inl | 1489 ++++++++--------- 2 files changed, 930 insertions(+), 968 deletions(-) diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index 4c274c0e..997a711f 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -1,6 +1,6 @@ // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // * Redistributions of source code must retain the above copyright @@ -11,7 +11,7 @@ // * Neither the name of the Andrey N. Sabelnikov nor the // names of its contributors may be used to endorse or promote products // derived from this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE @@ -22,13 +22,10 @@ // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - - - -#ifndef _ABSTRACT_TCP_SERVER2_H_ -#define _ABSTRACT_TCP_SERVER2_H_ +// +#ifndef _ABSTRACT_TCP_SERVER2_H_ +#define _ABSTRACT_TCP_SERVER2_H_ #include #include @@ -47,247 +44,271 @@ #include "net_utils_base.h" #include "syncobj.h" -#undef LOG_DEFAULT_CHANNEL +#undef LOG_DEFAULT_CHANNEL #define LOG_DEFAULT_CHANNEL "net_server" #define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT 1000 -namespace epee -{ -namespace net_utils -{ +namespace epee { +namespace net_utils { + +struct i_connection_filter { + virtual bool is_remote_ip_allowed(uint32_t adress) = 0; - struct i_connection_filter - { - virtual bool is_remote_ip_allowed(uint32_t adress)=0; protected: - virtual ~i_connection_filter(){} - }; - - /************************************************************************/ - /* */ - /************************************************************************/ - /// Represents a single connection from a client. - template - class connection - : public boost::enable_shared_from_this >, - private boost::noncopyable, - public i_service_endpoint + virtual ~i_connection_filter() { + } +}; + +/************************************************************************/ +/* */ +/************************************************************************/ +/// Represents a single connection from a client. +template +class connection + : public boost::enable_shared_from_this>, + private boost::noncopyable, + public i_service_endpoint { public: - typedef typename t_protocol_handler::connection_context t_connection_context; - /// Construct a connection with the given io_service. - explicit connection(boost::asio::io_service& io_service, - typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter * &pfilter); + typedef typename t_protocol_handler::connection_context t_connection_context; + /// Construct a connection with the given io_service. + explicit connection(boost::asio::io_service& io_service, + typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter*& pfilter); - virtual ~connection(); - /// Get the socket associated with the connection. - boost::asio::ip::tcp::socket& socket(); + virtual ~connection(); + /// Get the socket associated with the connection. + boost::asio::ip::tcp::socket& socket(); - /// Start the first asynchronous operation for the connection. - bool start(bool is_income, bool is_multithreaded); + /// Start the first asynchronous operation for the connection. + bool start(bool is_income, bool is_multithreaded); - void get_context(t_connection_context& context_){context_ = context;} + void get_context(t_connection_context& context_) + { + context_ = context; + } + + void call_back_starter(); + bool is_shutdown() + { + return m_was_shutdown; + } + bool cancel(); - void call_back_starter(); - bool is_shutdown(){return m_was_shutdown;} - bool cancel(); private: - //----------------- i_service_endpoint --------------------- - virtual bool do_send(const void* ptr, size_t cb); - virtual bool close(); - virtual bool call_run_once_service_io(); - virtual bool request_callback(); - virtual boost::asio::io_service& get_io_service(); - virtual bool add_ref(); - virtual bool release(); - //------------------------------------------------------ - boost::shared_ptr > safe_shared_from_this(); - bool shutdown(); - /// Handle completion of a read operation. - void handle_read(const boost::system::error_code& e, - std::size_t bytes_transferred); + //----------------- i_service_endpoint --------------------- + virtual bool do_send(const void* ptr, size_t cb); + virtual bool close(); + virtual bool call_run_once_service_io(); + virtual bool request_callback(); + virtual boost::asio::io_service& get_io_service(); + virtual bool add_ref(); + virtual bool release(); + //------------------------------------------------------ + boost::shared_ptr> safe_shared_from_this(); + bool shutdown(); + /// Handle completion of a read operation. + void handle_read(const boost::system::error_code& e, + std::size_t bytes_transferred); - /// Handle completion of a write operation. - void handle_write(const boost::system::error_code& e, size_t cb); + /// Handle completion of a write operation. + void handle_write(const boost::system::error_code& e, size_t cb); - /// Strand to ensure the connection's handlers are not called concurrently. - boost::asio::io_service::strand strand_; + /// Strand to ensure the connection's handlers are not called concurrently. + boost::asio::io_service::strand strand_; - /// Socket for the connection. - boost::asio::ip::tcp::socket socket_; + /// Socket for the connection. + boost::asio::ip::tcp::socket socket_; - /// Buffer for incoming data. - boost::array buffer_; + /// Buffer for incoming data. + boost::array buffer_; - t_connection_context context; - volatile uint32_t m_want_close_connection; - std::atomic m_was_shutdown; - critical_section m_send_que_lock; - std::list m_send_que; - volatile uint32_t& m_ref_sockets_count; - i_connection_filter* &m_pfilter; - volatile bool m_is_multithreaded; + boost::asio::io_service& m_rio_service; + t_connection_context context; + volatile uint32_t m_want_close_connection; + std::atomic m_was_shutdown; + critical_section m_send_que_lock; + std::list m_send_que; + volatile uint32_t& m_ref_sockets_count; + i_connection_filter*& m_pfilter; + volatile bool m_is_multithreaded; - //this should be the last one, because it could be wait on destructor, while other activities possible on other threads - t_protocol_handler m_protocol_handler; - //typename t_protocol_handler::config_type m_dummy_config; - std::list > > m_self_refs; // add_ref/release support - critical_section m_self_refs_lock; - }; + //this should be the last one, because it could be wait on destructor, while other activities possible on other threads + t_protocol_handler m_protocol_handler; + //typename t_protocol_handler::config_type m_dummy_config; + std::list>> m_self_refs; // add_ref/release support + critical_section m_self_refs_lock; +}; - - /************************************************************************/ - /* */ - /************************************************************************/ - template - class boosted_tcp_server - : private boost::noncopyable - { +/************************************************************************/ +/* */ +/************************************************************************/ +template +class boosted_tcp_server + : private boost::noncopyable { public: - typedef boost::shared_ptr > connection_ptr; - typedef typename t_protocol_handler::connection_context t_connection_context; - /// Construct the server to listen on the specified TCP address and port, and - /// serve up files from the given directory. - boosted_tcp_server(); - explicit boosted_tcp_server(boost::asio::io_service& external_io_service); - ~boosted_tcp_server(); + typedef boost::shared_ptr> connection_ptr; + typedef typename t_protocol_handler::connection_context t_connection_context; + /// Construct the server to listen on the specified TCP address and port, and + /// serve up files from the given directory. + boosted_tcp_server(); + explicit boosted_tcp_server(boost::asio::io_service& external_io_service); + ~boosted_tcp_server(); - bool init_server(uint32_t port, const std::string address = "0.0.0.0"); - bool init_server(const std::string port, const std::string& address = "0.0.0.0"); + bool init_server(uint32_t port, const std::string address = "0.0.0.0"); + bool init_server(const std::string port, const std::string& address = "0.0.0.0"); - /// Run the server's io_service loop. - bool run_server(size_t threads_count, bool wait = true); + /// Run the server's io_service loop. + bool run_server(size_t threads_count, bool wait = true); - /// wait for service workers stop - bool timed_wait_server_stop(uint64_t wait_mseconds); + /// wait for service workers stop + bool timed_wait_server_stop(uint64_t wait_mseconds); - /// Stop the server. - void send_stop_signal(); + /// Stop the server. + void send_stop_signal(); - bool is_stop_signal_sent(); + bool is_stop_signal_sent(); - void set_threads_prefix(const std::string& prefix_name); + void set_threads_prefix(const std::string& prefix_name); - bool deinit_server(){return true;} + bool deinit_server() + { + return true; + } - size_t get_threads_count(){return m_threads_count;} + size_t get_threads_count() + { + return m_threads_count; + } - void set_connection_filter(i_connection_filter* pfilter); + void set_connection_filter(i_connection_filter* pfilter); - bool connect(const std::string& adr, const std::string& port, uint32_t conn_timeot, t_connection_context& cn, const std::string& bind_ip = "0.0.0.0"); - template - bool connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeot, t_callback cb, const std::string& bind_ip = "0.0.0.0"); + bool connect(const std::string& adr, const std::string& port, uint32_t conn_timeot, t_connection_context& cn, const std::string& bind_ip = "0.0.0.0"); + template + bool connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeot, t_callback cb, const std::string& bind_ip = "0.0.0.0"); - typename t_protocol_handler::config_type& get_config_object(){return m_config;} + typename t_protocol_handler::config_type& get_config_object() + { + return m_config; + } - int get_binded_port(){return m_port;} + int get_binded_port() + { + return m_port; + } - boost::asio::io_service& get_io_service(){return io_service_;} + boost::asio::io_service& get_io_service() + { + return io_service_; + } - struct idle_callback_conext_base + struct idle_callback_conext_base { + virtual ~idle_callback_conext_base() { - virtual ~idle_callback_conext_base(){} + } - virtual bool call_handler(){return true;} - - idle_callback_conext_base(boost::asio::io_service& io_serice): - m_timer(io_serice) - {} - boost::asio::deadline_timer m_timer; - uint64_t m_period; - }; - - template - struct idle_callback_conext: public idle_callback_conext_base + virtual bool call_handler() { - idle_callback_conext(boost::asio::io_service& io_serice, t_handler& h, uint64_t period): - idle_callback_conext_base(io_serice), - m_handler(h) - {this->m_period = period;} - - t_handler m_handler; - virtual bool call_handler() - { - return m_handler(); - } - }; - - template - bool add_idle_handler(t_handler t_callback, uint64_t timeout_ms) - { - boost::shared_ptr ptr(new idle_callback_conext(io_service_, t_callback, timeout_ms)); - //needed call handler here ?... - ptr->m_timer.expires_from_now(boost::posix_time::milliseconds(ptr->m_period)); - ptr->m_timer.async_wait(boost::bind(&boosted_tcp_server::global_timer_handler, this, ptr)); - return true; - } - - bool global_timer_handler(/*const boost::system::error_code& err, */boost::shared_ptr ptr) - { - //if handler return false - he don't want to be called anymore - try{ - if (!ptr->call_handler()) - return true; - } - catch (...) - { - return true; - } - - - ptr->m_timer.expires_from_now(boost::posix_time::milliseconds(ptr->m_period)); - ptr->m_timer.async_wait(boost::bind(&boosted_tcp_server::global_timer_handler, this, ptr)); return true; } - template - bool async_call(t_handler t_callback) + idle_callback_conext_base(boost::asio::io_service& io_serice) + : m_timer(io_serice) { - io_service_.post(t_callback); + } + boost::asio::deadline_timer m_timer; + uint64_t m_period; + }; + + template + struct idle_callback_conext : public idle_callback_conext_base { + idle_callback_conext(boost::asio::io_service& io_serice, t_handler& h, uint64_t period) + : idle_callback_conext_base(io_serice), + m_handler(h) + { + this->m_period = period; + } + + t_handler m_handler; + virtual bool call_handler() + { + return m_handler(); + } + }; + + template + bool add_idle_handler(t_handler t_callback, uint64_t timeout_ms) + { + boost::shared_ptr ptr(new idle_callback_conext(io_service_, t_callback, timeout_ms)); + //needed call handler here ?... + ptr->m_timer.expires_from_now(boost::posix_time::milliseconds(ptr->m_period)); + ptr->m_timer.async_wait(boost::bind(&boosted_tcp_server::global_timer_handler, this, ptr)); + return true; + } + + bool global_timer_handler(/*const boost::system::error_code& err, */ boost::shared_ptr ptr) + { + //if handler return false - he don't want to be called anymore + try { + if(!ptr->call_handler()) + return true; + } + catch(...) { return true; } + ptr->m_timer.expires_from_now(boost::posix_time::milliseconds(ptr->m_period)); + ptr->m_timer.async_wait(boost::bind(&boosted_tcp_server::global_timer_handler, this, ptr)); + return true; + } + + template + bool async_call(t_handler t_callback) + { + io_service_.post(t_callback); + return true; + } + protected: - typename t_protocol_handler::config_type m_config; + typename t_protocol_handler::config_type m_config; private: - /// Run the server's io_service loop. - bool worker_thread(); - /// Handle completion of an asynchronous accept operation. - void handle_accept(const boost::system::error_code& e); + /// Run the server's io_service loop. + bool worker_thread(); + /// Handle completion of an asynchronous accept operation. + void handle_accept(const boost::system::error_code& e); - bool is_thread_worker(); + bool is_thread_worker(); - /// The io_service used to perform asynchronous operations. - std::unique_ptr m_io_service_local_instance; - boost::asio::io_service& io_service_; + /// The io_service used to perform asynchronous operations. + std::unique_ptr m_io_service_local_instance; + boost::asio::io_service& io_service_; - /// Acceptor used to listen for incoming connections. - boost::asio::ip::tcp::acceptor acceptor_; + /// Acceptor used to listen for incoming connections. + boost::asio::ip::tcp::acceptor acceptor_; - /// The next connection to be accepted. - connection_ptr new_connection_; - //std::mutex connections_mutex; - //std::deque connections_; - std::atomic m_stop_signal_sent; - uint32_t m_port; - volatile uint32_t m_sockets_count; - std::string m_address; - std::string m_thread_name_prefix; - size_t m_threads_count; - i_connection_filter* m_pfilter; - std::vector > m_threads; - boost::thread::id m_main_thread_id; - critical_section m_threads_lock; - volatile uint32_t m_thread_index; - }; -} -} + /// The next connection to be accepted. + connection_ptr new_connection_; + //std::mutex connections_mutex; + //std::deque connections_; + std::atomic m_stop_signal_sent; + uint32_t m_port; + volatile uint32_t m_sockets_count; + std::string m_address; + std::string m_thread_name_prefix; + size_t m_threads_count; + i_connection_filter* m_pfilter; + std::vector> m_threads; + boost::thread::id m_main_thread_id; + critical_section m_threads_lock; + volatile uint32_t m_thread_index; +}; +} // namespace net_utils +} // namespace epee #include "abstract_tcp_server2.inl" -#undef LOG_DEFAULT_CHANNEL +#undef LOG_DEFAULT_CHANNEL #define LOG_DEFAULT_CHANNEL NULL #endif \ No newline at end of file diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index c59c60e0..38a62b7c 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -1,6 +1,6 @@ // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. -// +// // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // * Redistributions of source code must retain the above copyright @@ -11,7 +11,7 @@ // * Neither the name of the Andrey N. Sabelnikov nor the // names of its contributors may be used to endorse or promote products // derived from this software without specific prior written permission. -// +// // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE @@ -22,9 +22,7 @@ // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - - +// #include "net_utils_base.h" #include @@ -38,809 +36,752 @@ #include "warnings.h" PUSH_WARNINGS -namespace epee -{ -namespace net_utils -{ - /************************************************************************/ - /* */ - /************************************************************************/ +namespace epee { +namespace net_utils { +/************************************************************************/ +/* */ +/************************************************************************/ DISABLE_VS_WARNINGS(4355) - template - connection::connection(boost::asio::io_service& io_service, - typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter* &pfilter) - : strand_(io_service), - socket_(io_service), - m_protocol_handler(this, config, context), - m_want_close_connection(0), - m_was_shutdown(0), - m_ref_sockets_count(sock_count), - m_pfilter(pfilter) - { - boost::interprocess::ipcdetail::atomic_inc32(&m_ref_sockets_count); - } +template +connection::connection(boost::asio::io_service& io_service, + typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter*& pfilter) + : m_rio_service(io_service), + strand_(io_service), + socket_(io_service), + m_protocol_handler(this, config, context), + m_want_close_connection(0), + m_was_shutdown(0), + m_ref_sockets_count(sock_count), + m_pfilter(pfilter) +{ + boost::interprocess::ipcdetail::atomic_inc32(&m_ref_sockets_count); +} DISABLE_VS_WARNINGS(4355) - //--------------------------------------------------------------------------------- - template - connection::~connection() - { - if(!m_was_shutdown) - { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown."); - shutdown(); - } - - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed"); - boost::interprocess::ipcdetail::atomic_dec32(&m_ref_sockets_count); - VALIDATE_MUTEX_IS_FREE(m_send_que_lock); - VALIDATE_MUTEX_IS_FREE(m_self_refs_lock); - } - //--------------------------------------------------------------------------------- - template - boost::asio::ip::tcp::socket& connection::socket() - { - return socket_; - } - //--------------------------------------------------------------------------------- - template - boost::shared_ptr > connection::safe_shared_from_this() - { - try - { - return connection::shared_from_this(); - } - catch (const boost::bad_weak_ptr&) - { - // It happens when the connection is being deleted - return boost::shared_ptr >(); - } - } - //--------------------------------------------------------------------------------- - template - bool connection::start(bool is_income, bool is_multithreaded) - { - TRY_ENTRY(); - - // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted - auto self = safe_shared_from_this(); - if(!self) - { - LOG_PRINT_RED("Failed to start conntection, failed to call safe_shared_from_this", LOG_LEVEL_2); - return false; - } - - m_is_multithreaded = is_multithreaded; - - boost::system::error_code ec; - auto remote_ep = socket_.remote_endpoint(ec); - CHECK_AND_NO_ASSERT_MES(!m_was_shutdown, false, "was shutdown on start connection"); - - CHECK_AND_NO_ASSERT_MES_LEVEL(!ec, false, "Failed to get remote endpoint(" << (is_income?"INT":"OUT") << "): " << ec.message() << ':' << ec.value(), LOG_LEVEL_2); - - auto local_ep = socket_.local_endpoint(ec); - CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get local endpoint: " << ec.message() << ':' << ec.value()); - - context = boost::value_initialized(); - long ip_ = boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong()); - - context.set_details(boost::uuids::random_generator()(), ip_, remote_ep.port(), is_income); - context.m_last_send = context.m_last_recv = time(NULL); - - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] new connection, remote end_point: " << print_connection_context_short(context) << - " local end_point: " << local_ep.address().to_string() << ':' << local_ep.port() << - ", total sockets objects " << m_ref_sockets_count); - - if(is_income && m_pfilter && !m_pfilter->is_remote_ip_allowed(context.m_remote_ip)) - { - LOG_PRINT_L0("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection"); - close(); - return false; - } - - m_protocol_handler.after_init_connection(); - - socket_.async_read_some(boost::asio::buffer(buffer_), - strand_.wrap( - boost::bind(&connection::handle_read, self, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred))); - - - return true; - - CATCH_ENTRY_L0("connection::start()", false); - } - //--------------------------------------------------------------------------------- - template - bool connection::request_callback() - { - TRY_ENTRY(); - LOG_PRINT_L2("[" << print_connection_context_short(context) << "] request_callback"); - // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted - auto self = safe_shared_from_this(); - if(!self) - return false; - - strand_.post(boost::bind(&connection::call_back_starter, self)); - CATCH_ENTRY_L0("connection::request_callback()", false); - return true; - } - //--------------------------------------------------------------------------------- - template - boost::asio::io_service& connection::get_io_service() - { - return socket_.get_io_service(); - } - //--------------------------------------------------------------------------------- - template - bool connection::add_ref() - { - TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] add_ref"); - CRITICAL_REGION_LOCAL(m_self_refs_lock); - - // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted - auto self = safe_shared_from_this(); - if(!self) - return false; - if(m_was_shutdown) - return false; - m_self_refs.push_back(self); - return true; - CATCH_ENTRY_L0("connection::add_ref()", false); - } - //--------------------------------------------------------------------------------- - template - bool connection::release() - { - TRY_ENTRY(); - boost::shared_ptr > back_connection_copy; - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] release"); - CRITICAL_REGION_BEGIN(m_self_refs_lock); - CHECK_AND_ASSERT_MES(m_self_refs.size(), false, "[sock " << socket_.native_handle() << "] m_self_refs empty at connection::release() call"); - //erasing from container without additional copy can cause start deleting object, including m_self_refs - back_connection_copy = m_self_refs.back(); - m_self_refs.pop_back(); - CRITICAL_REGION_END(); - return true; - CATCH_ENTRY_L0("connection::release()", false); - } - //--------------------------------------------------------------------------------- - template - void connection::call_back_starter() - { - TRY_ENTRY(); - LOG_PRINT_L2("[" << print_connection_context_short(context) << "] fired_callback"); - m_protocol_handler.handle_qued_callback(); - CATCH_ENTRY_L0("connection::call_back_starter()", void()); - } - //--------------------------------------------------------------------------------- - template - void connection::handle_read(const boost::system::error_code& e, - std::size_t bytes_transferred) - { - TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Assync read calledback."); - - if (!e) - { - LOG_PRINT("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred, LOG_LEVEL_4); - context.m_last_recv = time(NULL); - context.m_recv_cnt += bytes_transferred; - bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred); - if(!recv_res) - { - LOG_PRINT("[sock " << socket_.native_handle() << "] protocol_want_close", LOG_LEVEL_4); - - //some error in protocol, protocol handler ask to close connection - boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); - bool do_shutdown = false; - CRITICAL_REGION_BEGIN(m_send_que_lock); - if(!m_send_que.size()) - do_shutdown = true; - CRITICAL_REGION_END(); - if(do_shutdown) - shutdown(); - }else - { - socket_.async_read_some(boost::asio::buffer(buffer_), - strand_.wrap( - boost::bind(&connection::handle_read, connection::shared_from_this(), - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred))); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "]Assync read requested."); - } - }else - { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value()); - if(e.value() != 2) - { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value()); - shutdown(); - } - } - // If an error occurs then no new asynchronous operations are started. This - // means that all shared_ptr references to the connection object will - // disappear and the object will be destroyed automatically after this - // handler returns. The connection class's destructor closes the socket. - CATCH_ENTRY_L0("connection::handle_read", void()); - } - //--------------------------------------------------------------------------------- - template - bool connection::call_run_once_service_io() - { - TRY_ENTRY(); - if(!m_is_multithreaded) - { - //single thread model, we can wait in blocked call - size_t cnt = socket_.get_io_service().run_one(); - if(!cnt)//service is going to quit - return false; - }else - { - //multi thread model, we can't(!) wait in blocked call - //so we make non blocking call and releasing CPU by calling sleep(0); - //if no handlers were called - //TODO: Maybe we need to have have critical section + event + callback to upper protocol to - //ask it inside(!) critical region if we still able to go in event wait... - size_t cnt = socket_.get_io_service().poll_one(); - if(!cnt) - misc_utils::sleep_no_w(0); - } - - return true; - CATCH_ENTRY_L0("connection::call_run_once_service_io", false); - } - //--------------------------------------------------------------------------------- - template - bool connection::do_send(const void* ptr, size_t cb) - { - TRY_ENTRY(); - // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted - auto self = safe_shared_from_this(); - if(!self) - return false; - if(m_was_shutdown) - return false; - - LOG_PRINT("[sock " << socket_.native_handle() << "] SEND " << cb, LOG_LEVEL_4); - context.m_last_send = time(NULL); - context.m_send_cnt += cb; - //some data should be wrote to stream - //request complete - - CRITICAL_REGION_LOCAL_VAR(m_send_que_lock, send_guard); - if(m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT) - { - send_guard.unlock();//manual unlock - LOG_ERROR("send to [" << print_connection_context_short(context) << ", (" << (void*)this << ")] que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); - close(); - shutdown(); - return false; - } - - m_send_que.resize(m_send_que.size()+1); - m_send_que.back().assign((const char*)ptr, cb); - - if(m_send_que.size() > 1) - { - //active operation should be in progress, nothing to do, just wait last operation callback - }else - { - //no active operation - if(m_send_que.size()!=1) - { - LOG_ERROR("Looks like no active operations, but send que size != 1!!"); - return false; - } - - boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()), - //strand_.wrap( - boost::bind(&connection::handle_write, self, _1, _2) - //) - ); - - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Assync send requested " << m_send_que.front().size()); - } - - return true; - - CATCH_ENTRY_L0("connection::do_send", false); - } - //--------------------------------------------------------------------------------- - template - bool connection::shutdown() - { - if (m_was_shutdown) - return true; - // Initiate graceful connection closure. - boost::system::error_code ignored_ec; - socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); - m_was_shutdown = true; - m_protocol_handler.release_protocol(); - return true; - } - //--------------------------------------------------------------------------------- - template - bool connection::close() - { - TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Que Shutdown called."); - size_t send_que_size = 0; - CRITICAL_REGION_BEGIN(m_send_que_lock); - send_que_size = m_send_que.size(); - CRITICAL_REGION_END(); - boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); - if(!send_que_size) - { - shutdown(); - } - - return true; - CATCH_ENTRY_L0("connection::close", false); - } - //--------------------------------------------------------------------------------- - template - bool connection::cancel() - { - return close(); - } - //--------------------------------------------------------------------------------- - template - void connection::handle_write(const boost::system::error_code& e, size_t cb) - { - TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Assync send calledback " << cb); - - if (e) - { - LOG_PRINT_L0("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value()); - shutdown(); - return; - } - - bool do_shutdown = false; - CRITICAL_REGION_BEGIN(m_send_que_lock); - if(m_send_que.empty()) - { - LOG_ERROR("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!"); - return; - } - - m_send_que.pop_front(); - if(m_send_que.empty()) - { - if(boost::interprocess::ipcdetail::atomic_read32(&m_want_close_connection)) - { - do_shutdown = true; - } - }else - { - //have more data to send - boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()), - //strand_.wrap( - boost::bind(&connection::handle_write, connection::shared_from_this(), _1, _2)); - //); - } - CRITICAL_REGION_END(); - - if(do_shutdown) - { - shutdown(); - } - CATCH_ENTRY_L0("connection::handle_write", void()); - } - /************************************************************************/ - /* */ - /************************************************************************/ - template - boosted_tcp_server::boosted_tcp_server(): - m_io_service_local_instance(new boost::asio::io_service()), - io_service_(*m_io_service_local_instance.get()), - acceptor_(io_service_), - new_connection_(new connection(io_service_, m_config, m_sockets_count, m_pfilter)), - m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0) - { - m_thread_name_prefix = "NET"; +//--------------------------------------------------------------------------------- +template +connection::~connection() +{ + if(!m_was_shutdown) { + LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown."); + shutdown(); } - template - boosted_tcp_server::boosted_tcp_server(boost::asio::io_service& extarnal_io_service): - io_service_(extarnal_io_service), - acceptor_(io_service_), - new_connection_(new connection(io_service_, m_config, m_sockets_count, m_pfilter)), - m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0) - { - m_thread_name_prefix = "NET"; + LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed"); + boost::interprocess::ipcdetail::atomic_dec32(&m_ref_sockets_count); + VALIDATE_MUTEX_IS_FREE(m_send_que_lock); + VALIDATE_MUTEX_IS_FREE(m_self_refs_lock); +} +//--------------------------------------------------------------------------------- +template +boost::asio::ip::tcp::socket& connection::socket() +{ + return socket_; +} +//--------------------------------------------------------------------------------- +template +boost::shared_ptr> connection::safe_shared_from_this() +{ + try { + return connection::shared_from_this(); } - //--------------------------------------------------------------------------------- - template - boosted_tcp_server::~boosted_tcp_server() - { - this->send_stop_signal(); - timed_wait_server_stop(10000); + catch(const boost::bad_weak_ptr&) { + // It happens when the connection is being deleted + return boost::shared_ptr>(); } - //--------------------------------------------------------------------------------- - template - bool boosted_tcp_server::init_server(uint32_t port, const std::string address) - { - TRY_ENTRY(); - m_stop_signal_sent = false; - m_port = port; - m_address = address; - // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR). - boost::asio::ip::tcp::resolver resolver(io_service_); - boost::asio::ip::tcp::resolver::query query(address, boost::lexical_cast(port)); - boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query); - acceptor_.open(endpoint.protocol()); - acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); - acceptor_.bind(endpoint); - acceptor_.listen(); - boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint(); - m_port = binded_endpoint.port(); - acceptor_.async_accept(new_connection_->socket(), - boost::bind(&boosted_tcp_server::handle_accept, this, - boost::asio::placeholders::error)); +} +//--------------------------------------------------------------------------------- +template +bool connection::start(bool is_income, bool is_multithreaded) +{ + TRY_ENTRY(); - return true; - CATCH_ENTRY_L0("boosted_tcp_server::init_server", false); - } - //----------------------------------------------------------------------------- -PUSH_WARNINGS -DISABLE_GCC_WARNING(maybe-uninitialized) - template - bool boosted_tcp_server::init_server(const std::string port, const std::string& address) - { - uint32_t p = 0; - - if (port.size() && !string_tools::get_xtype_from_string(p, port)) { - LOG_ERROR("Failed to convert port no = " << port); - return false; - } - return this->init_server(p, address); - } -POP_WARNINGS - //--------------------------------------------------------------------------------- - template - bool boosted_tcp_server::worker_thread() - { - TRY_ENTRY(); - uint32_t local_thr_index = boost::interprocess::ipcdetail::atomic_inc32(&m_thread_index); - std::string thread_name = std::string("[") + m_thread_name_prefix; - thread_name += boost::to_string(local_thr_index) + "]"; - log_space::log_singletone::set_thread_log_prefix(thread_name); - while(!m_stop_signal_sent) - { - try - { - io_service_.run(); - } - catch(const std::exception& ex) - { - LOG_ERROR("Exception at server worker thread, what=" << ex.what()); - } - catch(...) - { - LOG_ERROR("Exception at server worker thread, unknown execption"); - } - } - LOG_PRINT_L4("Worker thread finished"); - return true; - CATCH_ENTRY_L0("boosted_tcp_server::worker_thread", false); - } - //--------------------------------------------------------------------------------- - template - void boosted_tcp_server::set_threads_prefix(const std::string& prefix_name) - { - m_thread_name_prefix = prefix_name; - } - //--------------------------------------------------------------------------------- - template - void boosted_tcp_server::set_connection_filter(i_connection_filter* pfilter) - { - m_pfilter = pfilter; - } - //--------------------------------------------------------------------------------- - template - bool boosted_tcp_server::run_server(size_t threads_count, bool wait) - { - TRY_ENTRY(); - m_threads_count = threads_count; - m_main_thread_id = boost::this_thread::get_id(); - log_space::log_singletone::set_thread_log_prefix("[SRV_MAIN]"); - while(!m_stop_signal_sent) - { - - // Create a pool of threads to run all of the io_services. - CRITICAL_REGION_BEGIN(m_threads_lock); - for (std::size_t i = 0; i < threads_count; ++i) - { - boost::shared_ptr thread(new boost::thread( - boost::bind(&boosted_tcp_server::worker_thread, this))); - m_threads.push_back(thread); - } - CRITICAL_REGION_END(); - // Wait for all threads in the pool to exit. - if(wait) - { - for (std::size_t i = 0; i < m_threads.size(); ++i) - m_threads[i]->join(); - m_threads.clear(); - - }else - { - return true; - } - - if(wait && !m_stop_signal_sent) - { - //some problems with the listening socket ?.. - LOG_PRINT_L0("Net service stopped without stop request, restarting..."); - if(!this->init_server(m_port, m_address)) - { - LOG_PRINT_L0("Reiniting service failed, exit."); - return false; - }else - { - LOG_PRINT_L0("Reiniting OK."); - } - } - } - return true; - CATCH_ENTRY_L0("boosted_tcp_server::run_server", false); - } - //--------------------------------------------------------------------------------- - template - bool boosted_tcp_server::is_thread_worker() - { - TRY_ENTRY(); - CRITICAL_REGION_LOCAL(m_threads_lock); - BOOST_FOREACH(boost::shared_ptr& thp, m_threads) - { - if(thp->get_id() == boost::this_thread::get_id()) - return true; - } - if(m_threads_count == 1 && boost::this_thread::get_id() == m_main_thread_id) - return true; + // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted + auto self = safe_shared_from_this(); + if(!self) { + LOG_PRINT_RED("Failed to start conntection, failed to call safe_shared_from_this", LOG_LEVEL_2); return false; - CATCH_ENTRY_L0("boosted_tcp_server::is_thread_worker", false); } - //--------------------------------------------------------------------------------- - template - bool boosted_tcp_server::timed_wait_server_stop(uint64_t wait_mseconds) - { - TRY_ENTRY(); - boost::chrono::milliseconds ms(wait_mseconds); - for (std::size_t i = 0; i < m_threads.size(); ++i) - { - if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms)) - { - LOG_PRINT_L0("Interrupting thread " << m_threads[i]->native_handle()); - m_threads[i]->interrupt(); - } + + m_is_multithreaded = is_multithreaded; + + boost::system::error_code ec; + auto remote_ep = socket_.remote_endpoint(ec); + CHECK_AND_NO_ASSERT_MES(!m_was_shutdown, false, "was shutdown on start connection"); + + CHECK_AND_NO_ASSERT_MES_LEVEL(!ec, false, "Failed to get remote endpoint(" << (is_income ? "INT" : "OUT") << "): " << ec.message() << ':' << ec.value(), LOG_LEVEL_2); + + auto local_ep = socket_.local_endpoint(ec); + CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get local endpoint: " << ec.message() << ':' << ec.value()); + + context = boost::value_initialized(); + long ip_ = boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong()); + + context.set_details(boost::uuids::random_generator()(), ip_, remote_ep.port(), is_income); + context.m_last_send = context.m_last_recv = time(NULL); + + LOG_PRINT_L3("[sock " << socket_.native_handle() << "] new connection, remote end_point: " << print_connection_context_short(context) << " local end_point: " << local_ep.address().to_string() << ':' << local_ep.port() << ", total sockets objects " << m_ref_sockets_count); + + if(is_income && m_pfilter && !m_pfilter->is_remote_ip_allowed(context.m_remote_ip)) { + LOG_PRINT_L0("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection"); + close(); + return false; + } + + m_protocol_handler.after_init_connection(); + + socket_.async_read_some(boost::asio::buffer(buffer_), + strand_.wrap( + boost::bind(&connection::handle_read, self, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred))); + + return true; + + CATCH_ENTRY_L0("connection::start()", false); +} +//--------------------------------------------------------------------------------- +template +bool connection::request_callback() +{ + TRY_ENTRY(); + LOG_PRINT_L2("[" << print_connection_context_short(context) << "] request_callback"); + // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted + auto self = safe_shared_from_this(); + if(!self) + return false; + + strand_.post(boost::bind(&connection::call_back_starter, self)); + CATCH_ENTRY_L0("connection::request_callback()", false); + return true; +} +//--------------------------------------------------------------------------------- +template +boost::asio::io_service& connection::get_io_service() +{ + return m_rio_service; +} +//--------------------------------------------------------------------------------- +template +bool connection::add_ref() +{ + TRY_ENTRY(); + LOG_PRINT_L4("[sock " << socket_.native_handle() << "] add_ref"); + CRITICAL_REGION_LOCAL(m_self_refs_lock); + + // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted + auto self = safe_shared_from_this(); + if(!self) + return false; + if(m_was_shutdown) + return false; + m_self_refs.push_back(self); + return true; + CATCH_ENTRY_L0("connection::add_ref()", false); +} +//--------------------------------------------------------------------------------- +template +bool connection::release() +{ + TRY_ENTRY(); + boost::shared_ptr> back_connection_copy; + LOG_PRINT_L4("[sock " << socket_.native_handle() << "] release"); + CRITICAL_REGION_BEGIN(m_self_refs_lock); + CHECK_AND_ASSERT_MES(m_self_refs.size(), false, "[sock " << socket_.native_handle() << "] m_self_refs empty at connection::release() call"); + //erasing from container without additional copy can cause start deleting object, including m_self_refs + back_connection_copy = m_self_refs.back(); + m_self_refs.pop_back(); + CRITICAL_REGION_END(); + return true; + CATCH_ENTRY_L0("connection::release()", false); +} +//--------------------------------------------------------------------------------- +template +void connection::call_back_starter() +{ + TRY_ENTRY(); + LOG_PRINT_L2("[" << print_connection_context_short(context) << "] fired_callback"); + m_protocol_handler.handle_qued_callback(); + CATCH_ENTRY_L0("connection::call_back_starter()", void()); +} +//--------------------------------------------------------------------------------- +template +void connection::handle_read(const boost::system::error_code& e, + std::size_t bytes_transferred) +{ + TRY_ENTRY(); + LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Assync read calledback."); + + if(!e) { + LOG_PRINT("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred, LOG_LEVEL_4); + context.m_last_recv = time(NULL); + context.m_recv_cnt += bytes_transferred; + bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred); + if(!recv_res) { + LOG_PRINT("[sock " << socket_.native_handle() << "] protocol_want_close", LOG_LEVEL_4); + + //some error in protocol, protocol handler ask to close connection + boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); + bool do_shutdown = false; + CRITICAL_REGION_BEGIN(m_send_que_lock); + if(!m_send_que.size()) + do_shutdown = true; + CRITICAL_REGION_END(); + if(do_shutdown) + shutdown(); } - return true; - CATCH_ENTRY_L0("boosted_tcp_server::timed_wait_server_stop", false); - } - //--------------------------------------------------------------------------------- - template - void boosted_tcp_server::send_stop_signal() - { - m_stop_signal_sent = true; - TRY_ENTRY(); - m_config.on_send_stop_signal(); - - io_service_.stop(); - CATCH_ENTRY_L0("boosted_tcp_server::send_stop_signal()", void()); - } - //--------------------------------------------------------------------------------- - template - bool boosted_tcp_server::is_stop_signal_sent() - { - return m_stop_signal_sent; - } - //--------------------------------------------------------------------------------- - template - void boosted_tcp_server::handle_accept(const boost::system::error_code& e) - { - TRY_ENTRY(); - if (!e) - { - connection_ptr conn(std::move(new_connection_)); - - new_connection_.reset(new connection(io_service_, m_config, m_sockets_count, m_pfilter)); - acceptor_.async_accept(new_connection_->socket(), - boost::bind(&boosted_tcp_server::handle_accept, this, - boost::asio::placeholders::error)); - - conn->start(true, 1 < m_threads_count); - }else - { - LOG_ERROR("Some problems at accept: " << e.message() << ", connections_count = " << m_sockets_count); + else { + socket_.async_read_some(boost::asio::buffer(buffer_), + strand_.wrap( + boost::bind(&connection::handle_read, connection::shared_from_this(), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred))); + LOG_PRINT_L4("[sock " << socket_.native_handle() << "]Assync read requested."); } - CATCH_ENTRY_L0("boosted_tcp_server::handle_accept", void()); } - //--------------------------------------------------------------------------------- - template - bool boosted_tcp_server::connect(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_connection_context& conn_context, const std::string& bind_ip) - { - TRY_ENTRY(); + else { + LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value()); + if(e.value() != 2) { + LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value()); + shutdown(); + } + } + // If an error occurs then no new asynchronous operations are started. This + // means that all shared_ptr references to the connection object will + // disappear and the object will be destroyed automatically after this + // handler returns. The connection class's destructor closes the socket. + CATCH_ENTRY_L0("connection::handle_read", void()); +} +//--------------------------------------------------------------------------------- +template +bool connection::call_run_once_service_io() +{ + TRY_ENTRY(); + if(!m_is_multithreaded) { + //single thread model, we can wait in blocked call + size_t cnt = m_rio_service.run_one(); - connection_ptr new_connection_l(new connection(io_service_, m_config, m_sockets_count, m_pfilter) ); - boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); - - ////////////////////////////////////////////////////////////////////////// - boost::asio::ip::tcp::resolver resolver(io_service_); - boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port); - boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query); - boost::asio::ip::tcp::resolver::iterator end; - if(iterator == end) - { - LOG_ERROR("Failed to resolve " << adr); + if(!cnt) //service is going to quit + return false; + } + else { + //multi thread model, we can't(!) wait in blocked call + //so we make non blocking call and releasing CPU by calling sleep(0); + //if no handlers were called + //TODO: Maybe we need to have have critical section + event + callback to upper protocol to + //ask it inside(!) critical region if we still able to go in event wait... + size_t cnt = m_rio_service.poll_one(); + if(!cnt) + misc_utils::sleep_no_w(0); + } + + return true; + CATCH_ENTRY_L0("connection::call_run_once_service_io", false); +} +//--------------------------------------------------------------------------------- +template +bool connection::do_send(const void* ptr, size_t cb) +{ + TRY_ENTRY(); + // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted + auto self = safe_shared_from_this(); + if(!self) + return false; + if(m_was_shutdown) + return false; + + LOG_PRINT("[sock " << socket_.native_handle() << "] SEND " << cb, LOG_LEVEL_4); + context.m_last_send = time(NULL); + context.m_send_cnt += cb; + //some data should be wrote to stream + //request complete + + CRITICAL_REGION_LOCAL_VAR(m_send_que_lock, send_guard); + if(m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT) { + send_guard.unlock(); //manual unlock + LOG_ERROR("send to [" << print_connection_context_short(context) << ", (" << (void*)this << ")] que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); + close(); + shutdown(); + return false; + } + + m_send_que.resize(m_send_que.size() + 1); + m_send_que.back().assign((const char*)ptr, cb); + + if(m_send_que.size() > 1) { + //active operation should be in progress, nothing to do, just wait last operation callback + } + else { + //no active operation + if(m_send_que.size() != 1) { + LOG_ERROR("Looks like no active operations, but send que size != 1!!"); return false; } - ////////////////////////////////////////////////////////////////////////// + boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()), + //strand_.wrap( + boost::bind(&connection::handle_write, self, _1, _2) + //) + ); - //boost::asio::ip::tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string(addr.c_str()), port); - boost::asio::ip::tcp::endpoint remote_endpoint(*iterator); - - sock_.open(remote_endpoint.protocol()); - if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" ) - { - boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(adr.c_str()), 0); - sock_.bind(local_endpoint); + LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Assync send requested " << m_send_que.front().size()); + } + + return true; + + CATCH_ENTRY_L0("connection::do_send", false); +} +//--------------------------------------------------------------------------------- +template +bool connection::shutdown() +{ + if(m_was_shutdown) + return true; + // Initiate graceful connection closure. + boost::system::error_code ignored_ec; + socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); + m_was_shutdown = true; + m_protocol_handler.release_protocol(); + return true; +} +//--------------------------------------------------------------------------------- +template +bool connection::close() +{ + TRY_ENTRY(); + LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Que Shutdown called."); + size_t send_que_size = 0; + CRITICAL_REGION_BEGIN(m_send_que_lock); + send_que_size = m_send_que.size(); + CRITICAL_REGION_END(); + boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); + if(!send_que_size) { + shutdown(); + } + + return true; + CATCH_ENTRY_L0("connection::close", false); +} +//--------------------------------------------------------------------------------- +template +bool connection::cancel() +{ + return close(); +} +//--------------------------------------------------------------------------------- +template +void connection::handle_write(const boost::system::error_code& e, size_t cb) +{ + TRY_ENTRY(); + LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Assync send calledback " << cb); + + if(e) { + LOG_PRINT_L0("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value()); + shutdown(); + return; + } + + bool do_shutdown = false; + CRITICAL_REGION_BEGIN(m_send_que_lock); + if(m_send_que.empty()) { + LOG_ERROR("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!"); + return; + } + + m_send_que.pop_front(); + if(m_send_que.empty()) { + if(boost::interprocess::ipcdetail::atomic_read32(&m_want_close_connection)) { + do_shutdown = true; + } + } + else { + //have more data to send + boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()), + //strand_.wrap( + boost::bind(&connection::handle_write, connection::shared_from_this(), _1, _2)); + //); + } + CRITICAL_REGION_END(); + + if(do_shutdown) { + shutdown(); + } + CATCH_ENTRY_L0("connection::handle_write", void()); +} +/************************************************************************/ +/* */ +/************************************************************************/ +template +boosted_tcp_server::boosted_tcp_server() + : m_io_service_local_instance(new boost::asio::io_service()), + io_service_(*m_io_service_local_instance.get()), + acceptor_(io_service_), + new_connection_(new connection(io_service_, m_config, m_sockets_count, m_pfilter)), + m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0) +{ + m_thread_name_prefix = "NET"; +} + +template +boosted_tcp_server::boosted_tcp_server(boost::asio::io_service& extarnal_io_service) + : io_service_(extarnal_io_service), + acceptor_(io_service_), + new_connection_(new connection(io_service_, m_config, m_sockets_count, m_pfilter)), + m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0) +{ + m_thread_name_prefix = "NET"; +} +//--------------------------------------------------------------------------------- +template +boosted_tcp_server::~boosted_tcp_server() +{ + this->send_stop_signal(); + timed_wait_server_stop(10000); +} +//--------------------------------------------------------------------------------- +template +bool boosted_tcp_server::init_server(uint32_t port, const std::string address) +{ + TRY_ENTRY(); + m_stop_signal_sent = false; + m_port = port; + m_address = address; + // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR). + boost::asio::ip::tcp::resolver resolver(io_service_); + boost::asio::ip::tcp::resolver::query query(address, boost::lexical_cast(port)); + boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query); + acceptor_.open(endpoint.protocol()); + acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + acceptor_.bind(endpoint); + acceptor_.listen(); + boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint(); + m_port = binded_endpoint.port(); + acceptor_.async_accept(new_connection_->socket(), + boost::bind(&boosted_tcp_server::handle_accept, this, + boost::asio::placeholders::error)); + + return true; + CATCH_ENTRY_L0("boosted_tcp_server::init_server", false); +} +//----------------------------------------------------------------------------- +PUSH_WARNINGS +DISABLE_GCC_WARNING(maybe - uninitialized) +template +bool boosted_tcp_server::init_server(const std::string port, const std::string& address) +{ + uint32_t p = 0; + + if(port.size() && !string_tools::get_xtype_from_string(p, port)) { + LOG_ERROR("Failed to convert port no = " << port); + return false; + } + return this->init_server(p, address); +} +POP_WARNINGS +//--------------------------------------------------------------------------------- +template +bool boosted_tcp_server::worker_thread() +{ + TRY_ENTRY(); + uint32_t local_thr_index = boost::interprocess::ipcdetail::atomic_inc32(&m_thread_index); + std::string thread_name = std::string("[") + m_thread_name_prefix; + thread_name += boost::to_string(local_thr_index) + "]"; + log_space::log_singletone::set_thread_log_prefix(thread_name); + while(!m_stop_signal_sent) { + try { + io_service_.run(); + } + catch(const std::exception& ex) { + LOG_ERROR("Exception at server worker thread, what=" << ex.what()); + } + catch(...) { + LOG_ERROR("Exception at server worker thread, unknown execption"); + } + } + LOG_PRINT_L4("Worker thread finished"); + return true; + CATCH_ENTRY_L0("boosted_tcp_server::worker_thread", false); +} +//--------------------------------------------------------------------------------- +template +void boosted_tcp_server::set_threads_prefix(const std::string& prefix_name) +{ + m_thread_name_prefix = prefix_name; +} +//--------------------------------------------------------------------------------- +template +void boosted_tcp_server::set_connection_filter(i_connection_filter* pfilter) +{ + m_pfilter = pfilter; +} +//--------------------------------------------------------------------------------- +template +bool boosted_tcp_server::run_server(size_t threads_count, bool wait) +{ + TRY_ENTRY(); + m_threads_count = threads_count; + m_main_thread_id = boost::this_thread::get_id(); + log_space::log_singletone::set_thread_log_prefix("[SRV_MAIN]"); + while(!m_stop_signal_sent) { + // Create a pool of threads to run all of the io_services. + CRITICAL_REGION_BEGIN(m_threads_lock); + for(std::size_t i = 0; i < threads_count; ++i) { + boost::shared_ptr thread(new boost::thread( + boost::bind(&boosted_tcp_server::worker_thread, this))); + m_threads.push_back(thread); + } + CRITICAL_REGION_END(); + // Wait for all threads in the pool to exit. + if(wait) { + for(std::size_t i = 0; i < m_threads.size(); ++i) + m_threads[i]->join(); + m_threads.clear(); + } + else { + return true; } - /* + if(wait && !m_stop_signal_sent) { + //some problems with the listening socket ?.. + LOG_PRINT_L0("Net service stopped without stop request, restarting..."); + if(!this->init_server(m_port, m_address)) { + LOG_PRINT_L0("Reiniting service failed, exit."); + return false; + } + else { + LOG_PRINT_L0("Reiniting OK."); + } + } + } + return true; + CATCH_ENTRY_L0("boosted_tcp_server::run_server", false); +} +//--------------------------------------------------------------------------------- +template +bool boosted_tcp_server::is_thread_worker() +{ + TRY_ENTRY(); + CRITICAL_REGION_LOCAL(m_threads_lock); + BOOST_FOREACH(boost::shared_ptr& thp, m_threads) { + if(thp->get_id() == boost::this_thread::get_id()) + return true; + } + if(m_threads_count == 1 && boost::this_thread::get_id() == m_main_thread_id) + return true; + return false; + CATCH_ENTRY_L0("boosted_tcp_server::is_thread_worker", false); +} +//--------------------------------------------------------------------------------- +template +bool boosted_tcp_server::timed_wait_server_stop(uint64_t wait_mseconds) +{ + TRY_ENTRY(); + boost::chrono::milliseconds ms(wait_mseconds); + for(std::size_t i = 0; i < m_threads.size(); ++i) { + if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms)) { + LOG_PRINT_L0("Interrupting thread " << m_threads[i]->native_handle()); + m_threads[i]->interrupt(); + } + } + return true; + CATCH_ENTRY_L0("boosted_tcp_server::timed_wait_server_stop", false); +} +//--------------------------------------------------------------------------------- +template +void boosted_tcp_server::send_stop_signal() +{ + m_stop_signal_sent = true; + TRY_ENTRY(); + m_config.on_send_stop_signal(); + + io_service_.stop(); + CATCH_ENTRY_L0("boosted_tcp_server::send_stop_signal()", void()); +} +//--------------------------------------------------------------------------------- +template +bool boosted_tcp_server::is_stop_signal_sent() +{ + return m_stop_signal_sent; +} +//--------------------------------------------------------------------------------- +template +void boosted_tcp_server::handle_accept(const boost::system::error_code& e) +{ + TRY_ENTRY(); + if(!e) { + connection_ptr conn(std::move(new_connection_)); + + new_connection_.reset(new connection(io_service_, m_config, m_sockets_count, m_pfilter)); + acceptor_.async_accept(new_connection_->socket(), + boost::bind(&boosted_tcp_server::handle_accept, this, + boost::asio::placeholders::error)); + + conn->start(true, 1 < m_threads_count); + } + else { + LOG_ERROR("Some problems at accept: " << e.message() << ", connections_count = " << m_sockets_count); + } + CATCH_ENTRY_L0("boosted_tcp_server::handle_accept", void()); +} +//--------------------------------------------------------------------------------- +template +bool boosted_tcp_server::connect(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_connection_context& conn_context, const std::string& bind_ip) +{ + TRY_ENTRY(); + + connection_ptr new_connection_l(new connection(io_service_, m_config, m_sockets_count, m_pfilter)); + boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); + + ////////////////////////////////////////////////////////////////////////// + boost::asio::ip::tcp::resolver resolver(io_service_); + boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port); + boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query); + boost::asio::ip::tcp::resolver::iterator end; + if(iterator == end) { + LOG_ERROR("Failed to resolve " << adr); + return false; + } + ////////////////////////////////////////////////////////////////////////// + + //boost::asio::ip::tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string(addr.c_str()), port); + boost::asio::ip::tcp::endpoint remote_endpoint(*iterator); + + sock_.open(remote_endpoint.protocol()); + if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "") { + boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(adr.c_str()), 0); + sock_.bind(local_endpoint); + } + + /* NOTICE: be careful to make sync connection from event handler: in case if all threads suddenly do sync connect, there will be no thread to dispatch events from io service. */ - boost::system::error_code ec = boost::asio::error::would_block; + boost::system::error_code ec = boost::asio::error::would_block; - //have another free thread(s), work in wait mode, without event handling - struct local_async_context - { - boost::system::error_code ec; - boost::mutex connect_mut; - boost::condition_variable cond; - }; + //have another free thread(s), work in wait mode, without event handling + struct local_async_context { + boost::system::error_code ec; + boost::mutex connect_mut; + boost::condition_variable cond; + }; - boost::shared_ptr local_shared_context(new local_async_context()); - local_shared_context->ec = boost::asio::error::would_block; - boost::unique_lock lock(local_shared_context->connect_mut); - auto connect_callback = [](boost::system::error_code ec_, boost::shared_ptr shared_context) - { - CRITICAL_SECTION_LOCK(shared_context->connect_mut); - shared_context->ec = ec_; - CRITICAL_SECTION_UNLOCK(shared_context->connect_mut); - shared_context->cond.notify_one(); - }; + boost::shared_ptr local_shared_context(new local_async_context()); + local_shared_context->ec = boost::asio::error::would_block; + boost::unique_lock lock(local_shared_context->connect_mut); + auto connect_callback = [](boost::system::error_code ec_, boost::shared_ptr shared_context) { + CRITICAL_SECTION_LOCK(shared_context->connect_mut); + shared_context->ec = ec_; + CRITICAL_SECTION_UNLOCK(shared_context->connect_mut); + shared_context->cond.notify_one(); + }; - sock_.async_connect(remote_endpoint, boost::bind(connect_callback, _1, local_shared_context)); - while(local_shared_context->ec == boost::asio::error::would_block) - { - bool r = false; - try{ - r = local_shared_context->cond.timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(conn_timeout)); - } - catch (...) - { - //timeout - sock_.close(); - LOG_PRINT_L3("timed_wait throwed, " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); - return false; - } - if(local_shared_context->ec == boost::asio::error::would_block && !r) - { - //timeout - sock_.close(); - LOG_PRINT_L3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); - return false; - } + sock_.async_connect(remote_endpoint, boost::bind(connect_callback, _1, local_shared_context)); + while(local_shared_context->ec == boost::asio::error::would_block) { + bool r = false; + try { + r = local_shared_context->cond.timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(conn_timeout)); } - ec = local_shared_context->ec; - - if (ec || !sock_.is_open()) - { - LOG_PRINT("Some problems at connect, message: " << ec.message(), LOG_LEVEL_3); + catch(...) { + //timeout + sock_.close(); + LOG_PRINT_L3("timed_wait throwed, " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); return false; } - - LOG_PRINT_L3("Connected success to " << adr << ':' << port); - - bool r = new_connection_l->start(false, 1 < m_threads_count); - if (r) - { - new_connection_l->get_context(conn_context); - //new_connection_l.reset(new connection(io_service_, m_config, m_sockets_count, m_pfilter)); + if(local_shared_context->ec == boost::asio::error::would_block && !r) { + //timeout + sock_.close(); + LOG_PRINT_L3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); + return false; } - - return r; - - CATCH_ENTRY_L0("boosted_tcp_server::connect", false); } - //--------------------------------------------------------------------------------- - template template - bool boosted_tcp_server::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_callback cb, const std::string& bind_ip) - { - TRY_ENTRY(); - connection_ptr new_connection_l(new connection(io_service_, m_config, m_sockets_count, m_pfilter) ); - boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); + ec = local_shared_context->ec; - ////////////////////////////////////////////////////////////////////////// - boost::asio::ip::tcp::resolver resolver(io_service_); - boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port); - boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query); - boost::asio::ip::tcp::resolver::iterator end; - if(iterator == end) - { - LOG_ERROR("Failed to resolve " << adr); - return false; + if(ec || !sock_.is_open()) { + LOG_PRINT("Some problems at connect, message: " << ec.message(), LOG_LEVEL_3); + return false; + } + + LOG_PRINT_L3("Connected success to " << adr << ':' << port); + + bool r = new_connection_l->start(false, 1 < m_threads_count); + if(r) { + new_connection_l->get_context(conn_context); + //new_connection_l.reset(new connection(io_service_, m_config, m_sockets_count, m_pfilter)); + } + + return r; + + CATCH_ENTRY_L0("boosted_tcp_server::connect", false); +} +//--------------------------------------------------------------------------------- +template +template +bool boosted_tcp_server::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_callback cb, const std::string& bind_ip) +{ + TRY_ENTRY(); + connection_ptr new_connection_l(new connection(io_service_, m_config, m_sockets_count, m_pfilter)); + boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); + + ////////////////////////////////////////////////////////////////////////// + boost::asio::ip::tcp::resolver resolver(io_service_); + boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port); + boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query); + boost::asio::ip::tcp::resolver::iterator end; + if(iterator == end) { + LOG_ERROR("Failed to resolve " << adr); + return false; + } + ////////////////////////////////////////////////////////////////////////// + boost::asio::ip::tcp::endpoint remote_endpoint(*iterator); + + sock_.open(remote_endpoint.protocol()); + if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "") { + boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(adr.c_str()), 0); + sock_.bind(local_endpoint); + } + + boost::shared_ptr sh_deadline(new boost::asio::deadline_timer(io_service_)); + //start deadline + sh_deadline->expires_from_now(boost::posix_time::milliseconds(conn_timeout)); + sh_deadline->async_wait([=](const boost::system::error_code& error) { + if(error != boost::asio::error::operation_aborted) { + LOG_PRINT_L3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")"); + new_connection_l->socket().close(); } - ////////////////////////////////////////////////////////////////////////// - boost::asio::ip::tcp::endpoint remote_endpoint(*iterator); - - sock_.open(remote_endpoint.protocol()); - if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" ) - { - boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(adr.c_str()), 0); - sock_.bind(local_endpoint); - } - - boost::shared_ptr sh_deadline(new boost::asio::deadline_timer(io_service_)); - //start deadline - sh_deadline->expires_from_now(boost::posix_time::milliseconds(conn_timeout)); - sh_deadline->async_wait([=](const boost::system::error_code& error) - { - if(error != boost::asio::error::operation_aborted) - { - LOG_PRINT_L3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")"); - new_connection_l->socket().close(); - } - }); - //start async connect - sock_.async_connect(remote_endpoint, [=](const boost::system::error_code& ec_) - { - t_connection_context conn_context = AUTO_VAL_INIT(conn_context); - boost::system::error_code ignored_ec; - boost::asio::ip::tcp::socket::endpoint_type lep = new_connection_l->socket().local_endpoint(ignored_ec); - if(!ec_) - {//success - if(!sh_deadline->cancel()) - { - cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation - }else if(new_connection_l->is_shutdown()) - { - //if deadline timer started and finished callback right after async_connect callback is started and before deadline timer sh_deadline->cancel() is called - cb(conn_context, boost::asio::error::operation_aborted); - } - else - { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port << - " from " << lep.address().to_string() << ':' << lep.port()); - bool r = new_connection_l->start(false, 1 < m_threads_count); - if (r) - { - new_connection_l->get_context(conn_context); - cb(conn_context, ec_); - } - else - { - cb(conn_context, boost::asio::error::fault); - } - } - }else - { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port << - " from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value()); + }); + //start async connect + sock_.async_connect(remote_endpoint, [=](const boost::system::error_code& ec_) { + t_connection_context conn_context = AUTO_VAL_INIT(conn_context); + boost::system::error_code ignored_ec; + boost::asio::ip::tcp::socket::endpoint_type lep = new_connection_l->socket().local_endpoint(ignored_ec); + if(!ec_) { //success + if(!sh_deadline->cancel()) { + cb(conn_context, boost::asio::error::operation_aborted); //this mean that deadline timer already queued callback with cancel operation, rare situation + } + else if(new_connection_l->is_shutdown()) { + //if deadline timer started and finished callback right after async_connect callback is started and before deadline timer sh_deadline->cancel() is called + cb(conn_context, boost::asio::error::operation_aborted); + } + else { + LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port()); + bool r = new_connection_l->start(false, 1 < m_threads_count); + if(r) { + new_connection_l->get_context(conn_context); cb(conn_context, ec_); } - }); - return true; - CATCH_ENTRY_L0("boosted_tcp_server::connect_async", false); - } -} + else { + cb(conn_context, boost::asio::error::fault); + } + } + } + else { + LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value()); + cb(conn_context, ec_); + } + }); + return true; + CATCH_ENTRY_L0("boosted_tcp_server::connect_async", false); } +} // namespace net_utils +} // namespace epee POP_WARNINGS