Merge pull request #2073
07c4276c
Don't issue a new timedsync while one is already in progress (Howard Chu)cf3a376c
Don't timeout a slow operation that's making progress (Howard Chu)340830de
Fix PR#2039 (Howard Chu)
This commit is contained in:
commit
f31b89012d
3 changed files with 55 additions and 11 deletions
|
@ -42,6 +42,10 @@
|
||||||
#undef MONERO_DEFAULT_LOG_CATEGORY
|
#undef MONERO_DEFAULT_LOG_CATEGORY
|
||||||
#define MONERO_DEFAULT_LOG_CATEGORY "net"
|
#define MONERO_DEFAULT_LOG_CATEGORY "net"
|
||||||
|
|
||||||
|
#ifndef MIN_BYTES_WANTED
|
||||||
|
#define MIN_BYTES_WANTED 512
|
||||||
|
#endif
|
||||||
|
|
||||||
namespace epee
|
namespace epee
|
||||||
{
|
{
|
||||||
namespace levin
|
namespace levin
|
||||||
|
@ -139,26 +143,23 @@ public:
|
||||||
virtual bool is_timer_started() const=0;
|
virtual bool is_timer_started() const=0;
|
||||||
virtual void cancel()=0;
|
virtual void cancel()=0;
|
||||||
virtual bool cancel_timer()=0;
|
virtual bool cancel_timer()=0;
|
||||||
|
virtual void reset_timer()=0;
|
||||||
|
virtual void timeout_handler(const boost::system::error_code& error)=0;
|
||||||
};
|
};
|
||||||
template <class callback_t>
|
template <class callback_t>
|
||||||
struct anvoke_handler: invoke_response_handler_base
|
struct anvoke_handler: invoke_response_handler_base
|
||||||
{
|
{
|
||||||
anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
|
anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
|
||||||
:m_cb(cb), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
|
:m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
|
||||||
m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command)
|
m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command)
|
||||||
{
|
{
|
||||||
if(m_con.start_outer_call())
|
if(m_con.start_outer_call())
|
||||||
{
|
{
|
||||||
|
MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
|
||||||
m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
|
m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
|
||||||
m_timer.async_wait([&con, command, cb](const boost::system::error_code& ec)
|
m_timer.async_wait([this](const boost::system::error_code& ec)
|
||||||
{
|
{
|
||||||
if(ec == boost::asio::error::operation_aborted)
|
timeout_handler(ec);
|
||||||
return;
|
|
||||||
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command);
|
|
||||||
std::string fake;
|
|
||||||
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
|
|
||||||
con.close();
|
|
||||||
con.finish_outer_call();
|
|
||||||
});
|
});
|
||||||
m_timer_started = true;
|
m_timer_started = true;
|
||||||
}
|
}
|
||||||
|
@ -171,7 +172,18 @@ public:
|
||||||
bool m_timer_started;
|
bool m_timer_started;
|
||||||
bool m_cancel_timer_called;
|
bool m_cancel_timer_called;
|
||||||
bool m_timer_cancelled;
|
bool m_timer_cancelled;
|
||||||
|
uint64_t m_timeout;
|
||||||
int m_command;
|
int m_command;
|
||||||
|
virtual void timeout_handler(const boost::system::error_code& error)
|
||||||
|
{
|
||||||
|
if(error == boost::asio::error::operation_aborted)
|
||||||
|
return;
|
||||||
|
MINFO(m_con.get_context_ref() << "Timeout on invoke operation happened, command: " << m_command << " timeout: " << m_timeout);
|
||||||
|
std::string fake;
|
||||||
|
m_cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, m_con.get_context_ref());
|
||||||
|
m_con.close();
|
||||||
|
m_con.finish_outer_call();
|
||||||
|
}
|
||||||
virtual bool handle(int res, const std::string& buff, typename async_protocol_handler::connection_context& context)
|
virtual bool handle(int res, const std::string& buff, typename async_protocol_handler::connection_context& context)
|
||||||
{
|
{
|
||||||
if(!cancel_timer())
|
if(!cancel_timer())
|
||||||
|
@ -203,6 +215,18 @@ public:
|
||||||
}
|
}
|
||||||
return m_timer_cancelled;
|
return m_timer_cancelled;
|
||||||
}
|
}
|
||||||
|
virtual void reset_timer()
|
||||||
|
{
|
||||||
|
boost::system::error_code ignored_ec;
|
||||||
|
if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
|
||||||
|
{
|
||||||
|
m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
|
||||||
|
m_timer.async_wait([this](const boost::system::error_code& ec)
|
||||||
|
{
|
||||||
|
timeout_handler(ec);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
critical_section m_invoke_response_handlers_lock;
|
critical_section m_invoke_response_handlers_lock;
|
||||||
std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
|
std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
|
||||||
|
@ -342,6 +366,13 @@ public:
|
||||||
if(m_cache_in_buffer.size() < m_current_head.m_cb)
|
if(m_cache_in_buffer.size() < m_current_head.m_cb)
|
||||||
{
|
{
|
||||||
is_continue = false;
|
is_continue = false;
|
||||||
|
if(cb >= MIN_BYTES_WANTED && !m_invoke_response_handlers.empty())
|
||||||
|
{
|
||||||
|
//async call scenario
|
||||||
|
boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
|
||||||
|
response_handler->reset_timer();
|
||||||
|
MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
|
@ -595,9 +626,15 @@ public:
|
||||||
<< ", ver=" << head.m_protocol_version);
|
<< ", ver=" << head.m_protocol_version);
|
||||||
|
|
||||||
uint64_t ticks_start = misc_utils::get_tick_count();
|
uint64_t ticks_start = misc_utils::get_tick_count();
|
||||||
|
size_t prev_size = 0;
|
||||||
|
|
||||||
while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_buf_ready) && !m_deletion_initiated && !m_protocol_released)
|
while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_buf_ready) && !m_deletion_initiated && !m_protocol_released)
|
||||||
{
|
{
|
||||||
|
if(m_cache_in_buffer.size() - prev_size >= MIN_BYTES_WANTED)
|
||||||
|
{
|
||||||
|
prev_size = m_cache_in_buffer.size();
|
||||||
|
ticks_start = misc_utils::get_tick_count();
|
||||||
|
}
|
||||||
if(misc_utils::get_tick_count() - ticks_start > m_config.m_invoke_timeout)
|
if(misc_utils::get_tick_count() - ticks_start > m_config.m_invoke_timeout)
|
||||||
{
|
{
|
||||||
MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection ");
|
MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection ");
|
||||||
|
|
|
@ -156,6 +156,7 @@ namespace net_utils
|
||||||
const network_address m_remote_address;
|
const network_address m_remote_address;
|
||||||
const bool m_is_income;
|
const bool m_is_income;
|
||||||
const time_t m_started;
|
const time_t m_started;
|
||||||
|
bool m_in_timedsync;
|
||||||
time_t m_last_recv;
|
time_t m_last_recv;
|
||||||
time_t m_last_send;
|
time_t m_last_send;
|
||||||
uint64_t m_recv_cnt;
|
uint64_t m_recv_cnt;
|
||||||
|
@ -171,6 +172,7 @@ namespace net_utils
|
||||||
m_remote_address(remote_address),
|
m_remote_address(remote_address),
|
||||||
m_is_income(is_income),
|
m_is_income(is_income),
|
||||||
m_started(time(NULL)),
|
m_started(time(NULL)),
|
||||||
|
m_in_timedsync(false),
|
||||||
m_last_recv(last_recv),
|
m_last_recv(last_recv),
|
||||||
m_last_send(last_send),
|
m_last_send(last_send),
|
||||||
m_recv_cnt(recv_cnt),
|
m_recv_cnt(recv_cnt),
|
||||||
|
@ -183,6 +185,7 @@ namespace net_utils
|
||||||
m_remote_address(new ipv4_network_address(0,0)),
|
m_remote_address(new ipv4_network_address(0,0)),
|
||||||
m_is_income(false),
|
m_is_income(false),
|
||||||
m_started(time(NULL)),
|
m_started(time(NULL)),
|
||||||
|
m_in_timedsync(false),
|
||||||
m_last_recv(0),
|
m_last_recv(0),
|
||||||
m_last_send(0),
|
m_last_send(0),
|
||||||
m_recv_cnt(0),
|
m_recv_cnt(0),
|
||||||
|
|
|
@ -812,6 +812,7 @@ namespace nodetool
|
||||||
bool r = epee::net_utils::async_invoke_remote_command2<typename COMMAND_TIMED_SYNC::response>(context_.m_connection_id, COMMAND_TIMED_SYNC::ID, arg, m_net_server.get_config_object(),
|
bool r = epee::net_utils::async_invoke_remote_command2<typename COMMAND_TIMED_SYNC::response>(context_.m_connection_id, COMMAND_TIMED_SYNC::ID, arg, m_net_server.get_config_object(),
|
||||||
[this](int code, const typename COMMAND_TIMED_SYNC::response& rsp, p2p_connection_context& context)
|
[this](int code, const typename COMMAND_TIMED_SYNC::response& rsp, p2p_connection_context& context)
|
||||||
{
|
{
|
||||||
|
context.m_in_timedsync = false;
|
||||||
if(code < 0)
|
if(code < 0)
|
||||||
{
|
{
|
||||||
LOG_ERROR_CC(context, "COMMAND_TIMED_SYNC invoke failed. (" << code << ", " << epee::levin::get_err_descr(code) << ")");
|
LOG_ERROR_CC(context, "COMMAND_TIMED_SYNC invoke failed. (" << code << ", " << epee::levin::get_err_descr(code) << ")");
|
||||||
|
@ -1295,10 +1296,13 @@ namespace nodetool
|
||||||
MDEBUG("STARTED PEERLIST IDLE HANDSHAKE");
|
MDEBUG("STARTED PEERLIST IDLE HANDSHAKE");
|
||||||
typedef std::list<std::pair<epee::net_utils::connection_context_base, peerid_type> > local_connects_type;
|
typedef std::list<std::pair<epee::net_utils::connection_context_base, peerid_type> > local_connects_type;
|
||||||
local_connects_type cncts;
|
local_connects_type cncts;
|
||||||
m_net_server.get_config_object().foreach_connection([&](const p2p_connection_context& cntxt)
|
m_net_server.get_config_object().foreach_connection([&](p2p_connection_context& cntxt)
|
||||||
{
|
{
|
||||||
if(cntxt.peer_id)
|
if(cntxt.peer_id && !cntxt.m_in_timedsync)
|
||||||
|
{
|
||||||
|
cntxt.m_in_timedsync = true;
|
||||||
cncts.push_back(local_connects_type::value_type(cntxt, cntxt.peer_id));//do idle sync only with handshaked connections
|
cncts.push_back(local_connects_type::value_type(cntxt, cntxt.peer_id));//do idle sync only with handshaked connections
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue