diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index 3e6ea217..59433611 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -72,11 +72,6 @@ namespace net_utils virtual ~i_connection_filter(){} }; - enum t_server_role { // type of the server, e.g. so that we will know how to limit it - NET = 0, // default (not used? used for misc connections maybe?) TODO - RPC = 1, // the rpc commands - P2P = 2 // to other p2p node - }; /************************************************************************/ /* */ @@ -97,7 +92,8 @@ namespace net_utils typename t_protocol_handler::config_type& config, std::atomic &ref_sock_count, // the ++/-- counter std::atomic &sock_number, // the only increasing ++ number generator - i_connection_filter * &pfilter); + i_connection_filter * &pfilter + ,t_connection_type connection_type); virtual ~connection(); /// Get the socket associated with the connection. @@ -109,6 +105,12 @@ namespace net_utils void get_context(t_connection_context& context_){context_ = context;} void call_back_starter(); + + void save_dbg_log(); + + + bool speed_limit_is_enabled() const; ///< tells us should we be sleeping here (e.g. do not sleep on RPC connections) + private: //----------------- i_service_endpoint --------------------- virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint) @@ -144,7 +146,7 @@ namespace net_utils critical_section m_self_refs_lock; critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk() - t_server_role m_connection_type; + t_connection_type m_connection_type; // for calculate speed (last 60 sec) network_throttle m_throttle_speed_in; @@ -153,7 +155,7 @@ namespace net_utils std::mutex m_throttle_speed_out_mutex; public: - void setRPcStation(); + void setRpcStation(); }; @@ -169,12 +171,14 @@ namespace net_utils typedef typename t_protocol_handler::connection_context t_connection_context; /// Construct the server to listen on the specified TCP address and port, and /// serve up files from the given directory. - boosted_tcp_server(); - explicit boosted_tcp_server(boost::asio::io_service& external_io_service, t_server_role s_type); + + boosted_tcp_server(t_connection_type connection_type = e_connection_type_NET); + explicit boosted_tcp_server(boost::asio::io_service& external_io_service, t_connection_type connection_type = e_connection_type_NET); ~boosted_tcp_server(); - std::map server_type_map; - void create_server_type_map(); + std::map server_type_map; + void create_server_type_map(); + bool init_server(uint32_t port, const std::string address = "0.0.0.0"); bool init_server(const std::string port, const std::string& address = "0.0.0.0"); @@ -292,12 +296,16 @@ namespace net_utils boost::thread::id m_main_thread_id; critical_section m_threads_lock; volatile uint32_t m_thread_index; // TODO change to std::atomic - t_server_role type; void detach_threads(); + t_connection_type m_connection_type; + /// The next connection to be accepted connection_ptr new_connection_; + }; // class <>boosted_tcp_server + + } // namespace } // namespace diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index 31836fe9..612e2b41 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -71,16 +71,17 @@ PRAGMA_WARNING_DISABLE_VS(4355) std::atomic &ref_sock_count, // the ++/-- counter std::atomic &sock_number, // the only increasing ++ number generator i_connection_filter* &pfilter + ,t_connection_type connection_type ) : connection_basic(io_service, ref_sock_count, sock_number), m_protocol_handler(this, config, context), m_pfilter( pfilter ), - m_connection_type(NET), + m_connection_type( connection_type ), m_throttle_speed_in("speed_in", "throttle_speed_in"), m_throttle_speed_out("speed_out", "throttle_speed_out") { - _info_c("net/sleepRPC", "connection constructor set m_connection_type="< " << socket_.remote_endpoint().address().to_string() << ":" << socket_.remote_endpoint().port() + ); + } + //--------------------------------------------------------------------------------- + template void connection::handle_read(const boost::system::error_code& e, std::size_t bytes_transferred) { @@ -253,25 +264,30 @@ PRAGMA_WARNING_DISABLE_VS(4355) context.m_current_speed_down = m_throttle_speed_in.get_current_speed(); } - { + { CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in ); epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred * 1024); } - double delay=0; // will be calculated - do - { - { //_scope_dbg1("CRITICAL_REGION_LOCAL"); - CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in ); - delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global - } - - delay *= 0.5; - if (delay > 0) { - long int ms = (long int)(delay * 100); - epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms); - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); - } - } while(delay > 0); + + double delay=0; // will be calculated - how much we should sleep to obey speed limit etc + + + if (speed_limit_is_enabled()) { + do // keep sleeping if we should sleep + { + { //_scope_dbg1("CRITICAL_REGION_LOCAL"); + CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in ); + delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global throttle + } + + delay *= 0.5; + if (delay > 0) { + long int ms = (long int)(delay * 100); + epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms); + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + } + } while(delay > 0); + } // any form of sleeping //_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred); logger_handle_net_read(bytes_transferred); @@ -356,7 +372,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) ); const t_safe chunksize_max = chunksize_good * 2 ; - const bool allow_split = (m_connection_type == RPC) ? false : true; // TODO config + const bool allow_split = (m_connection_type == e_connection_type_RPC) ? false : true; // do not split RPC data ASRT(! (chunksize_max<0) ); // make sure it is unsigned before removin sign with cast: long long unsigned int chunksize_max_unsigned = static_cast( chunksize_max ) ; @@ -447,7 +463,10 @@ PRAGMA_WARNING_DISABLE_VS(4355) //some data should be wrote to stream //request complete - sleep_before_packet(cb, 1, 1); + if (speed_limit_is_enabled()) { + sleep_before_packet(cb, 1, 1); + } + epee::critical_region_t send_guard(m_send_que_lock); // *** critical *** long int retry=0; const long int retry_limit = 5*4; @@ -496,7 +515,8 @@ PRAGMA_WARNING_DISABLE_VS(4355) auto size_now = m_send_que.front().size(); _dbg1_c("net/out/size", "do_send() NOW SENSD: packet="<::close", false); } + //--------------------------------------------------------------------------------- template void connection::handle_write(const boost::system::error_code& e, size_t cb) @@ -559,7 +580,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) return; } logger_handle_net_write(cb); - sleep_before_packet(cb, 1, 1); + + if (speed_limit_is_enabled()) { + sleep_before_packet(cb, 1, 1); + } + bool do_shutdown = false; CRITICAL_REGION_BEGIN(m_send_que_lock); if(m_send_que.empty()) @@ -580,7 +605,8 @@ PRAGMA_WARNING_DISABLE_VS(4355) //have more data to send auto size_now = m_send_que.front().size(); _dbg1_c("net/out/size", "handle_write() NOW SENDS: packet="<::handle_write", void()); } + //--------------------------------------------------------------------------------- template - void connection::setRPcStation() + void connection::setRpcStation() { - m_connection_type = RPC; - _fact_c("net/sleepRPC", "set m_connection_type = RPC "); + m_connection_type = e_connection_type_RPC; + _fact_c("net/sleepRPC", "set m_connection_type = RPC "); + _info_c("net/kind", "set m_connection_type = RPC "); } + + + template + bool connection::speed_limit_is_enabled() const { + return m_connection_type != e_connection_type_RPC ; + } + /************************************************************************/ /* */ /************************************************************************/ + template - boosted_tcp_server::boosted_tcp_server(): + boosted_tcp_server::boosted_tcp_server( t_connection_type connection_type ) : m_io_service_local_instance(new boost::asio::io_service()), io_service_(*m_io_service_local_instance.get()), acceptor_(io_service_), m_stop_signal_sent(false), m_port(0), m_sock_count(0), m_sock_number(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0), - new_connection_(new connection(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)) + m_connection_type( connection_type ), + new_connection_(new connection(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type)) { create_server_type_map(); m_thread_name_prefix = "NET"; - type = NET; } template - boosted_tcp_server::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_server_role s_type): + boosted_tcp_server::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) : io_service_(extarnal_io_service), acceptor_(io_service_), m_stop_signal_sent(false), m_port(0), m_sock_count(0), m_sock_number(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0), - type(NET), - new_connection_(new connection(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)) + m_connection_type(connection_type), + new_connection_(new connection(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, connection_type)) { - create_server_type_map(); + create_server_type_map(); m_thread_name_prefix = "NET"; } //--------------------------------------------------------------------------------- @@ -646,9 +682,9 @@ PRAGMA_WARNING_DISABLE_VS(4355) template void boosted_tcp_server::create_server_type_map() { - server_type_map["NET"] = t_server_role::NET; - server_type_map["RPC"] = t_server_role::RPC; - server_type_map["P2P"] = t_server_role::P2P; + server_type_map["NET"] = e_connection_type_NET; + server_type_map["RPC"] = e_connection_type_RPC; + server_type_map["P2P"] = e_connection_type_P2P; } //--------------------------------------------------------------------------------- template @@ -668,6 +704,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) acceptor_.listen(); boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint(); m_port = binded_endpoint.port(); + _fact_c("net/RPClog", "start accept"); acceptor_.async_accept(new_connection_->socket(), boost::bind(&boosted_tcp_server::handle_accept, this, boost::asio::placeholders::error)); @@ -724,9 +761,11 @@ POP_WARNINGS void boosted_tcp_server::set_threads_prefix(const std::string& prefix_name) { m_thread_name_prefix = prefix_name; - type = server_type_map[m_thread_name_prefix]; - _note("Set server type to: " << type); - _note("Set server type to: " << m_thread_name_prefix); + auto it = server_type_map.find(m_thread_name_prefix); + if (it==server_type_map.end()) throw std::runtime_error("Unknown prefix/server type:" + std::string(prefix_name)); + auto connection_type = it->second; // the value of type + _info_c("net/RPClog", "Set server type to: " << connection_type << " from name: " << m_thread_name_prefix); + _info_c("net/RPClog", "prefix_name = " << prefix_name); } //--------------------------------------------------------------------------------- template @@ -845,15 +884,17 @@ POP_WARNINGS template void boosted_tcp_server::handle_accept(const boost::system::error_code& e) { + _fact_c("net/RPClog", "handle_accept"); TRY_ENTRY(); if (!e) { - if (type == RPC) { - new_connection_->setRPcStation(); - _note("New server for RPC connections"); + if (m_connection_type == e_connection_type_RPC) { + _note_c("net/rpc", "New server for RPC connections"); + _fact_c("net/RPClog", "New server for RPC connections"); + new_connection_->setRpcStation(); // hopefully this is not needed actually } connection_ptr conn(std::move(new_connection_)); - new_connection_.reset(new connection(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)); + new_connection_.reset(new connection(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type)); acceptor_.async_accept(new_connection_->socket(), boost::bind(&boosted_tcp_server::handle_accept, this, boost::asio::placeholders::error)); @@ -861,6 +902,7 @@ POP_WARNINGS bool r = conn->start(true, 1 < m_threads_count); if (!r) _erro("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count); + conn->save_dbg_log(); }else { _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_sock_count); @@ -873,7 +915,7 @@ POP_WARNINGS { TRY_ENTRY(); - connection_ptr new_connection_l(new connection(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) ); + connection_ptr new_connection_l(new connection(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type) ); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); ////////////////////////////////////////////////////////////////////////// @@ -953,6 +995,8 @@ POP_WARNINGS { _erro("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count); } + + new_connection_l->save_dbg_log(); return r; @@ -963,7 +1007,7 @@ POP_WARNINGS bool boosted_tcp_server::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_callback cb, const std::string& bind_ip) { TRY_ENTRY(); - connection_ptr new_connection_l(new connection(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) ); + connection_ptr new_connection_l(new connection(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type) ); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); ////////////////////////////////////////////////////////////////////////// diff --git a/contrib/epee/include/net/http_server_impl_base.h b/contrib/epee/include/net/http_server_impl_base.h index 97a7ebc5..10f74b9a 100644 --- a/contrib/epee/include/net/http_server_impl_base.h +++ b/contrib/epee/include/net/http_server_impl_base.h @@ -45,7 +45,7 @@ namespace epee public: http_server_impl_base() - : m_net_server() + : m_net_server(epee::net_utils::e_connection_type_RPC) {} explicit http_server_impl_base(boost::asio::io_service& external_io_service) @@ -75,6 +75,7 @@ namespace epee { //go to loop LOG_PRINT("Run net_service loop( " << threads_count << " threads)...", LOG_LEVEL_0); + _fact_c("net/RPClog", "Run net_service loop( " << threads_count << " threads)..."); if(!m_net_server.run_server(threads_count, wait)) { LOG_ERROR("Failed to run net tcp server!"); diff --git a/src/p2p/connection_basic.cpp b/src/p2p/connection_basic.cpp index ed15c098..0454f30e 100644 --- a/src/p2p/connection_basic.cpp +++ b/src/p2p/connection_basic.cpp @@ -94,6 +94,18 @@ namespace epee namespace net_utils { + std::string to_string(t_connection_type type) + { + if (type == e_connection_type_NET) + return std::string("NET"); + else if (type == e_connection_type_RPC) + return std::string("RPC"); + else if (type == e_connection_type_P2P) + return std::string("P2P"); + + return std::string("UNKNOWN"); + } + /* ============================================================================ */ diff --git a/src/p2p/connection_basic.hpp b/src/p2p/connection_basic.hpp index e9fdc3ad..d8101afe 100644 --- a/src/p2p/connection_basic.hpp +++ b/src/p2p/connection_basic.hpp @@ -75,6 +75,14 @@ namespace net_utils class connection_basic_pimpl; // PIMPL for this class + enum t_connection_type { // type of the connection (of this server), e.g. so that we will know how to limit it + e_connection_type_NET = 0, // default (not used?) + e_connection_type_RPC = 1, // the rpc commands (probably not rate limited, not chunked, etc) + e_connection_type_P2P = 2 // to other p2p node (probably limited) + }; + + std::string to_string(t_connection_type type); + class connection_basic { // not-templated base class for rapid developmet of some code parts public: std::unique_ptr< connection_basic_pimpl > mI; // my Implementation diff --git a/src/p2p/net_node.h b/src/p2p/net_node.h index d956b37f..f86a4f97 100644 --- a/src/p2p/net_node.h +++ b/src/p2p/net_node.h @@ -84,7 +84,8 @@ namespace nodetool :m_payload_handler(payload_handler), m_allow_local_ip(false), m_no_igd(false), - m_hide_my_port(false) + m_hide_my_port(false), + m_net_server( epee::net_utils::e_connection_type_P2P ) // this is a P2P connection of the main p2p node server, because this is class node_server<> { m_current_number_of_out_peers = 0; m_save_graph = false;