mirror of
https://codeberg.org/anoncontributorxmr/monero.git
synced 2024-12-11 00:00:51 +00:00
f2939bdce8
close might end up dropping a ref, ending up removing the connection from m_connects, as the lock is recursive. This'd cause an out of bounds exception and kill the idle connection maker thread
877 lines
33 KiB
C++
877 lines
33 KiB
C++
// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
|
|
// All rights reserved.
|
|
//
|
|
// Redistribution and use in source and binary forms, with or without
|
|
// modification, are permitted provided that the following conditions are met:
|
|
// * Redistributions of source code must retain the above copyright
|
|
// notice, this list of conditions and the following disclaimer.
|
|
// * Redistributions in binary form must reproduce the above copyright
|
|
// notice, this list of conditions and the following disclaimer in the
|
|
// documentation and/or other materials provided with the distribution.
|
|
// * Neither the name of the Andrey N. Sabelnikov nor the
|
|
// names of its contributors may be used to endorse or promote products
|
|
// derived from this software without specific prior written permission.
|
|
//
|
|
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
|
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
|
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY
|
|
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
|
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
|
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
|
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
//
|
|
|
|
#pragma once
|
|
#include <boost/asio/deadline_timer.hpp>
|
|
#include <boost/uuid/uuid_generators.hpp>
|
|
#include <boost/unordered_map.hpp>
|
|
#include <boost/interprocess/detail/atomic.hpp>
|
|
#include <boost/smart_ptr/make_shared.hpp>
|
|
|
|
#include <atomic>
|
|
|
|
#include "levin_base.h"
|
|
#include "misc_language.h"
|
|
|
|
#include <random>
|
|
#include <chrono>
|
|
|
|
#undef MONERO_DEFAULT_LOG_CATEGORY
|
|
#define MONERO_DEFAULT_LOG_CATEGORY "net"
|
|
|
|
#ifndef MIN_BYTES_WANTED
|
|
#define MIN_BYTES_WANTED 512
|
|
#endif
|
|
|
|
namespace epee
|
|
{
|
|
namespace levin
|
|
{
|
|
|
|
/************************************************************************/
|
|
/* */
|
|
/************************************************************************/
|
|
template<class t_connection_context>
|
|
class async_protocol_handler;
|
|
|
|
template<class t_connection_context>
|
|
class async_protocol_handler_config
|
|
{
|
|
typedef boost::unordered_map<boost::uuids::uuid, async_protocol_handler<t_connection_context>* > connections_map;
|
|
critical_section m_connects_lock;
|
|
connections_map m_connects;
|
|
|
|
void add_connection(async_protocol_handler<t_connection_context>* pc);
|
|
void del_connection(async_protocol_handler<t_connection_context>* pc);
|
|
|
|
async_protocol_handler<t_connection_context>* find_connection(boost::uuids::uuid connection_id) const;
|
|
int find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph);
|
|
|
|
friend class async_protocol_handler<t_connection_context>;
|
|
|
|
public:
|
|
typedef t_connection_context connection_context;
|
|
levin_commands_handler<t_connection_context>* m_pcommands_handler;
|
|
uint64_t m_max_packet_size;
|
|
uint64_t m_invoke_timeout;
|
|
|
|
int invoke(int command, const std::string& in_buff, std::string& buff_out, boost::uuids::uuid connection_id);
|
|
template<class callback_t>
|
|
int invoke_async(int command, const std::string& in_buff, boost::uuids::uuid connection_id, callback_t cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
|
|
|
|
int notify(int command, const std::string& in_buff, boost::uuids::uuid connection_id);
|
|
bool close(boost::uuids::uuid connection_id);
|
|
bool update_connection_context(const t_connection_context& contxt);
|
|
bool request_callback(boost::uuids::uuid connection_id);
|
|
template<class callback_t>
|
|
bool foreach_connection(callback_t cb);
|
|
template<class callback_t>
|
|
bool for_connection(const boost::uuids::uuid &connection_id, callback_t cb);
|
|
size_t get_connections_count();
|
|
|
|
async_protocol_handler_config():m_pcommands_handler(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE)
|
|
{}
|
|
void del_out_connections(size_t count);
|
|
};
|
|
|
|
|
|
/************************************************************************/
|
|
/* */
|
|
/************************************************************************/
|
|
template<class t_connection_context = net_utils::connection_context_base>
|
|
class async_protocol_handler
|
|
{
|
|
public:
|
|
typedef t_connection_context connection_context;
|
|
typedef async_protocol_handler_config<t_connection_context> config_type;
|
|
|
|
enum stream_state
|
|
{
|
|
stream_state_head,
|
|
stream_state_body
|
|
};
|
|
|
|
std::atomic<bool> m_deletion_initiated;
|
|
std::atomic<bool> m_protocol_released;
|
|
volatile uint32_t m_invoke_buf_ready;
|
|
|
|
volatile int m_invoke_result_code;
|
|
|
|
critical_section m_local_inv_buff_lock;
|
|
std::string m_local_inv_buff;
|
|
|
|
critical_section m_send_lock;
|
|
critical_section m_call_lock;
|
|
|
|
volatile uint32_t m_wait_count;
|
|
volatile uint32_t m_close_called;
|
|
bucket_head2 m_current_head;
|
|
net_utils::i_service_endpoint* m_pservice_endpoint;
|
|
config_type& m_config;
|
|
t_connection_context& m_connection_context;
|
|
|
|
std::string m_cache_in_buffer;
|
|
stream_state m_state;
|
|
|
|
int32_t m_oponent_protocol_ver;
|
|
bool m_connection_initialized;
|
|
|
|
struct invoke_response_handler_base
|
|
{
|
|
virtual bool handle(int res, const std::string& buff, connection_context& context)=0;
|
|
virtual bool is_timer_started() const=0;
|
|
virtual void cancel()=0;
|
|
virtual bool cancel_timer()=0;
|
|
virtual void reset_timer()=0;
|
|
};
|
|
template <class callback_t>
|
|
struct anvoke_handler: invoke_response_handler_base
|
|
{
|
|
anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
|
|
:m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
|
|
m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command)
|
|
{
|
|
if(m_con.start_outer_call())
|
|
{
|
|
MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
|
|
m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
|
|
m_timer.async_wait([&con, command, cb, timeout](const boost::system::error_code& ec)
|
|
{
|
|
if(ec == boost::asio::error::operation_aborted)
|
|
return;
|
|
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
|
|
std::string fake;
|
|
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
|
|
con.close();
|
|
con.finish_outer_call();
|
|
});
|
|
m_timer_started = true;
|
|
}
|
|
}
|
|
virtual ~anvoke_handler()
|
|
{}
|
|
callback_t m_cb;
|
|
async_protocol_handler& m_con;
|
|
boost::asio::deadline_timer m_timer;
|
|
bool m_timer_started;
|
|
bool m_cancel_timer_called;
|
|
bool m_timer_cancelled;
|
|
uint64_t m_timeout;
|
|
int m_command;
|
|
virtual bool handle(int res, const std::string& buff, typename async_protocol_handler::connection_context& context)
|
|
{
|
|
if(!cancel_timer())
|
|
return false;
|
|
m_cb(res, buff, context);
|
|
m_con.finish_outer_call();
|
|
return true;
|
|
}
|
|
virtual bool is_timer_started() const
|
|
{
|
|
return m_timer_started;
|
|
}
|
|
virtual void cancel()
|
|
{
|
|
if(cancel_timer())
|
|
{
|
|
std::string fake;
|
|
m_cb(LEVIN_ERROR_CONNECTION_DESTROYED, fake, m_con.get_context_ref());
|
|
m_con.finish_outer_call();
|
|
}
|
|
}
|
|
virtual bool cancel_timer()
|
|
{
|
|
if(!m_cancel_timer_called)
|
|
{
|
|
m_cancel_timer_called = true;
|
|
boost::system::error_code ignored_ec;
|
|
m_timer_cancelled = 1 == m_timer.cancel(ignored_ec);
|
|
}
|
|
return m_timer_cancelled;
|
|
}
|
|
virtual void reset_timer()
|
|
{
|
|
boost::system::error_code ignored_ec;
|
|
if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
|
|
{
|
|
callback_t& cb = m_cb;
|
|
uint64_t timeout = m_timeout;
|
|
async_protocol_handler& con = m_con;
|
|
int command = m_command;
|
|
m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
|
|
m_timer.async_wait([&con, cb, command, timeout](const boost::system::error_code& ec)
|
|
{
|
|
if(ec == boost::asio::error::operation_aborted)
|
|
return;
|
|
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
|
|
std::string fake;
|
|
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
|
|
con.close();
|
|
con.finish_outer_call();
|
|
});
|
|
}
|
|
}
|
|
};
|
|
critical_section m_invoke_response_handlers_lock;
|
|
std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
|
|
|
|
template<class callback_t>
|
|
bool add_invoke_response_handler(callback_t cb, uint64_t timeout, async_protocol_handler& con, int command)
|
|
{
|
|
CRITICAL_REGION_LOCAL(m_invoke_response_handlers_lock);
|
|
boost::shared_ptr<invoke_response_handler_base> handler(boost::make_shared<anvoke_handler<callback_t>>(cb, timeout, con, command));
|
|
m_invoke_response_handlers.push_back(handler);
|
|
return handler->is_timer_started();
|
|
}
|
|
template<class callback_t> friend struct anvoke_handler;
|
|
public:
|
|
async_protocol_handler(net_utils::i_service_endpoint* psnd_hndlr,
|
|
config_type& config,
|
|
t_connection_context& conn_context):
|
|
m_current_head(bucket_head2()),
|
|
m_pservice_endpoint(psnd_hndlr),
|
|
m_config(config),
|
|
m_connection_context(conn_context),
|
|
m_state(stream_state_head)
|
|
{
|
|
m_close_called = 0;
|
|
m_deletion_initiated = false;
|
|
m_protocol_released = false;
|
|
m_wait_count = 0;
|
|
m_oponent_protocol_ver = 0;
|
|
m_connection_initialized = false;
|
|
}
|
|
virtual ~async_protocol_handler()
|
|
{
|
|
m_deletion_initiated = true;
|
|
if(m_connection_initialized)
|
|
{
|
|
m_config.del_connection(this);
|
|
}
|
|
|
|
for (size_t i = 0; i < 60 * 1000 / 100 && 0 != boost::interprocess::ipcdetail::atomic_read32(&m_wait_count); ++i)
|
|
{
|
|
misc_utils::sleep_no_w(100);
|
|
}
|
|
CHECK_AND_ASSERT_MES_NO_RET(0 == boost::interprocess::ipcdetail::atomic_read32(&m_wait_count), "Failed to wait for operation completion. m_wait_count = " << m_wait_count);
|
|
|
|
MTRACE(m_connection_context << "~async_protocol_handler()");
|
|
}
|
|
|
|
bool start_outer_call()
|
|
{
|
|
MTRACE(m_connection_context << "[levin_protocol] -->> start_outer_call");
|
|
if(!m_pservice_endpoint->add_ref())
|
|
{
|
|
MERROR(m_connection_context << "[levin_protocol] -->> start_outer_call failed");
|
|
return false;
|
|
}
|
|
boost::interprocess::ipcdetail::atomic_inc32(&m_wait_count);
|
|
return true;
|
|
}
|
|
bool finish_outer_call()
|
|
{
|
|
MTRACE(m_connection_context << "[levin_protocol] <<-- finish_outer_call");
|
|
boost::interprocess::ipcdetail::atomic_dec32(&m_wait_count);
|
|
m_pservice_endpoint->release();
|
|
return true;
|
|
}
|
|
|
|
bool release_protocol()
|
|
{
|
|
decltype(m_invoke_response_handlers) local_invoke_response_handlers;
|
|
CRITICAL_REGION_BEGIN(m_invoke_response_handlers_lock);
|
|
local_invoke_response_handlers.swap(m_invoke_response_handlers);
|
|
m_protocol_released = true;
|
|
CRITICAL_REGION_END();
|
|
|
|
// Never call callback inside critical section, that can cause deadlock. Callback can be called when
|
|
// invoke_response_handler_base is cancelled
|
|
std::for_each(local_invoke_response_handlers.begin(), local_invoke_response_handlers.end(), [](const boost::shared_ptr<invoke_response_handler_base>& pinv_resp_hndlr) {
|
|
pinv_resp_hndlr->cancel();
|
|
});
|
|
|
|
return true;
|
|
}
|
|
|
|
bool close()
|
|
{
|
|
boost::interprocess::ipcdetail::atomic_inc32(&m_close_called);
|
|
|
|
m_pservice_endpoint->close();
|
|
return true;
|
|
}
|
|
|
|
void update_connection_context(const connection_context& contxt)
|
|
{
|
|
m_connection_context = contxt;
|
|
}
|
|
|
|
void request_callback()
|
|
{
|
|
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
|
|
boost::bind(&async_protocol_handler::finish_outer_call, this));
|
|
|
|
m_pservice_endpoint->request_callback();
|
|
}
|
|
|
|
void handle_qued_callback()
|
|
{
|
|
m_config.m_pcommands_handler->callback(m_connection_context);
|
|
}
|
|
|
|
virtual bool handle_recv(const void* ptr, size_t cb)
|
|
{
|
|
if(boost::interprocess::ipcdetail::atomic_read32(&m_close_called))
|
|
return false; //closing connections
|
|
|
|
if(!m_config.m_pcommands_handler)
|
|
{
|
|
MERROR(m_connection_context << "Commands handler not set!");
|
|
return false;
|
|
}
|
|
|
|
if(m_cache_in_buffer.size() + cb > m_config.m_max_packet_size)
|
|
{
|
|
MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size
|
|
<< ", packet received " << m_cache_in_buffer.size() + cb
|
|
<< ", connection will be closed.");
|
|
return false;
|
|
}
|
|
|
|
m_cache_in_buffer.append((const char*)ptr, cb);
|
|
|
|
bool is_continue = true;
|
|
while(is_continue)
|
|
{
|
|
switch(m_state)
|
|
{
|
|
case stream_state_body:
|
|
if(m_cache_in_buffer.size() < m_current_head.m_cb)
|
|
{
|
|
is_continue = false;
|
|
if(cb >= MIN_BYTES_WANTED && !m_invoke_response_handlers.empty())
|
|
{
|
|
//async call scenario
|
|
boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
|
|
response_handler->reset_timer();
|
|
MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb);
|
|
}
|
|
break;
|
|
}
|
|
{
|
|
std::string buff_to_invoke;
|
|
if(m_cache_in_buffer.size() == m_current_head.m_cb)
|
|
buff_to_invoke.swap(m_cache_in_buffer);
|
|
else
|
|
{
|
|
buff_to_invoke.assign(m_cache_in_buffer, 0, (std::string::size_type)m_current_head.m_cb);
|
|
m_cache_in_buffer.erase(0, (std::string::size_type)m_current_head.m_cb);
|
|
}
|
|
|
|
bool is_response = (m_oponent_protocol_ver == LEVIN_PROTOCOL_VER_1 && m_current_head.m_flags&LEVIN_PACKET_RESPONSE);
|
|
|
|
MDEBUG(m_connection_context << "LEVIN_PACKET_RECIEVED. [len=" << m_current_head.m_cb
|
|
<< ", flags" << m_current_head.m_flags
|
|
<< ", r?=" << m_current_head.m_have_to_return_data
|
|
<<", cmd = " << m_current_head.m_command
|
|
<< ", v=" << m_current_head.m_protocol_version);
|
|
|
|
if(is_response)
|
|
{//response to some invoke
|
|
|
|
epee::critical_region_t<decltype(m_invoke_response_handlers_lock)> invoke_response_handlers_guard(m_invoke_response_handlers_lock);
|
|
if(!m_invoke_response_handlers.empty())
|
|
{//async call scenario
|
|
boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
|
|
bool timer_cancelled = response_handler->cancel_timer();
|
|
// Don't pop handler, to avoid destroying it
|
|
if(timer_cancelled)
|
|
m_invoke_response_handlers.pop_front();
|
|
invoke_response_handlers_guard.unlock();
|
|
|
|
if(timer_cancelled)
|
|
response_handler->handle(m_current_head.m_return_code, buff_to_invoke, m_connection_context);
|
|
}
|
|
else
|
|
{
|
|
invoke_response_handlers_guard.unlock();
|
|
//use sync call scenario
|
|
if(!boost::interprocess::ipcdetail::atomic_read32(&m_wait_count) && !boost::interprocess::ipcdetail::atomic_read32(&m_close_called))
|
|
{
|
|
MERROR(m_connection_context << "no active invoke when response came, wtf?");
|
|
return false;
|
|
}else
|
|
{
|
|
CRITICAL_REGION_BEGIN(m_local_inv_buff_lock);
|
|
buff_to_invoke.swap(m_local_inv_buff);
|
|
buff_to_invoke.clear();
|
|
m_invoke_result_code = m_current_head.m_return_code;
|
|
CRITICAL_REGION_END();
|
|
boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 1);
|
|
}
|
|
}
|
|
}else
|
|
{
|
|
if(m_current_head.m_have_to_return_data)
|
|
{
|
|
std::string return_buff;
|
|
m_current_head.m_return_code = m_config.m_pcommands_handler->invoke(
|
|
m_current_head.m_command,
|
|
buff_to_invoke,
|
|
return_buff,
|
|
m_connection_context);
|
|
m_current_head.m_cb = return_buff.size();
|
|
m_current_head.m_have_to_return_data = false;
|
|
m_current_head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
|
|
m_current_head.m_flags = LEVIN_PACKET_RESPONSE;
|
|
std::string send_buff((const char*)&m_current_head, sizeof(m_current_head));
|
|
send_buff += return_buff;
|
|
CRITICAL_REGION_BEGIN(m_send_lock);
|
|
if(!m_pservice_endpoint->do_send(send_buff.data(), send_buff.size()))
|
|
return false;
|
|
CRITICAL_REGION_END();
|
|
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << m_current_head.m_cb
|
|
<< ", flags" << m_current_head.m_flags
|
|
<< ", r?=" << m_current_head.m_have_to_return_data
|
|
<<", cmd = " << m_current_head.m_command
|
|
<< ", ver=" << m_current_head.m_protocol_version);
|
|
}
|
|
else
|
|
m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context);
|
|
}
|
|
}
|
|
m_state = stream_state_head;
|
|
break;
|
|
case stream_state_head:
|
|
{
|
|
if(m_cache_in_buffer.size() < sizeof(bucket_head2))
|
|
{
|
|
if(m_cache_in_buffer.size() >= sizeof(uint64_t) && *((uint64_t*)m_cache_in_buffer.data()) != LEVIN_SIGNATURE)
|
|
{
|
|
MWARNING(m_connection_context << "Signature mismatch, connection will be closed");
|
|
return false;
|
|
}
|
|
is_continue = false;
|
|
break;
|
|
}
|
|
|
|
bucket_head2* phead = (bucket_head2*)m_cache_in_buffer.data();
|
|
if(LEVIN_SIGNATURE != phead->m_signature)
|
|
{
|
|
LOG_ERROR_CC(m_connection_context, "Signature mismatch, connection will be closed");
|
|
return false;
|
|
}
|
|
m_current_head = *phead;
|
|
|
|
m_cache_in_buffer.erase(0, sizeof(bucket_head2));
|
|
m_state = stream_state_body;
|
|
m_oponent_protocol_ver = m_current_head.m_protocol_version;
|
|
if(m_current_head.m_cb > m_config.m_max_packet_size)
|
|
{
|
|
LOG_ERROR_CC(m_connection_context, "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size
|
|
<< ", packet header received " << m_current_head.m_cb
|
|
<< ", connection will be closed.");
|
|
return false;
|
|
}
|
|
}
|
|
break;
|
|
default:
|
|
LOG_ERROR_CC(m_connection_context, "Undefined state in levin_server_impl::connection_handler, m_state=" << m_state);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool after_init_connection()
|
|
{
|
|
if (!m_connection_initialized)
|
|
{
|
|
m_connection_initialized = true;
|
|
m_config.add_connection(this);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
template<class callback_t>
|
|
bool async_invoke(int command, const std::string& in_buff, callback_t cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
|
|
{
|
|
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
|
|
boost::bind(&async_protocol_handler::finish_outer_call, this));
|
|
|
|
if(timeout == LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
|
|
timeout = m_config.m_invoke_timeout;
|
|
|
|
int err_code = LEVIN_OK;
|
|
do
|
|
{
|
|
if(m_deletion_initiated)
|
|
{
|
|
err_code = LEVIN_ERROR_CONNECTION_DESTROYED;
|
|
break;
|
|
}
|
|
|
|
CRITICAL_REGION_LOCAL(m_call_lock);
|
|
|
|
if(m_deletion_initiated)
|
|
{
|
|
err_code = LEVIN_ERROR_CONNECTION_DESTROYED;
|
|
break;
|
|
}
|
|
|
|
bucket_head2 head = {0};
|
|
head.m_signature = LEVIN_SIGNATURE;
|
|
head.m_cb = in_buff.size();
|
|
head.m_have_to_return_data = true;
|
|
|
|
head.m_flags = LEVIN_PACKET_REQUEST;
|
|
head.m_command = command;
|
|
head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
|
|
|
|
boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0);
|
|
CRITICAL_REGION_BEGIN(m_send_lock);
|
|
CRITICAL_REGION_LOCAL1(m_invoke_response_handlers_lock);
|
|
if(!m_pservice_endpoint->do_send(&head, sizeof(head)))
|
|
{
|
|
LOG_ERROR_CC(m_connection_context, "Failed to do_send");
|
|
err_code = LEVIN_ERROR_CONNECTION;
|
|
break;
|
|
}
|
|
|
|
if(!m_pservice_endpoint->do_send(in_buff.data(), (int)in_buff.size()))
|
|
{
|
|
LOG_ERROR_CC(m_connection_context, "Failed to do_send");
|
|
err_code = LEVIN_ERROR_CONNECTION;
|
|
break;
|
|
}
|
|
|
|
if(!add_invoke_response_handler(cb, timeout, *this, command))
|
|
{
|
|
err_code = LEVIN_ERROR_CONNECTION_DESTROYED;
|
|
break;
|
|
}
|
|
CRITICAL_REGION_END();
|
|
} while (false);
|
|
|
|
if (LEVIN_OK != err_code)
|
|
{
|
|
std::string stub_buff;
|
|
// Never call callback inside critical section, that can cause deadlock
|
|
cb(err_code, stub_buff, m_connection_context);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
int invoke(int command, const std::string& in_buff, std::string& buff_out)
|
|
{
|
|
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
|
|
boost::bind(&async_protocol_handler::finish_outer_call, this));
|
|
|
|
if(m_deletion_initiated)
|
|
return LEVIN_ERROR_CONNECTION_DESTROYED;
|
|
|
|
CRITICAL_REGION_LOCAL(m_call_lock);
|
|
|
|
if(m_deletion_initiated)
|
|
return LEVIN_ERROR_CONNECTION_DESTROYED;
|
|
|
|
bucket_head2 head = {0};
|
|
head.m_signature = LEVIN_SIGNATURE;
|
|
head.m_cb = in_buff.size();
|
|
head.m_have_to_return_data = true;
|
|
|
|
head.m_flags = LEVIN_PACKET_REQUEST;
|
|
head.m_command = command;
|
|
head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
|
|
|
|
boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0);
|
|
CRITICAL_REGION_BEGIN(m_send_lock);
|
|
if(!m_pservice_endpoint->do_send(&head, sizeof(head)))
|
|
{
|
|
LOG_ERROR_CC(m_connection_context, "Failed to do_send");
|
|
return LEVIN_ERROR_CONNECTION;
|
|
}
|
|
|
|
if(!m_pservice_endpoint->do_send(in_buff.data(), (int)in_buff.size()))
|
|
{
|
|
LOG_ERROR_CC(m_connection_context, "Failed to do_send");
|
|
return LEVIN_ERROR_CONNECTION;
|
|
}
|
|
CRITICAL_REGION_END();
|
|
|
|
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
|
|
<< ", f=" << head.m_flags
|
|
<< ", r?=" << head.m_have_to_return_data
|
|
<< ", cmd = " << head.m_command
|
|
<< ", ver=" << head.m_protocol_version);
|
|
|
|
uint64_t ticks_start = misc_utils::get_tick_count();
|
|
size_t prev_size = 0;
|
|
|
|
while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_buf_ready) && !m_deletion_initiated && !m_protocol_released)
|
|
{
|
|
if(m_cache_in_buffer.size() - prev_size >= MIN_BYTES_WANTED)
|
|
{
|
|
prev_size = m_cache_in_buffer.size();
|
|
ticks_start = misc_utils::get_tick_count();
|
|
}
|
|
if(misc_utils::get_tick_count() - ticks_start > m_config.m_invoke_timeout)
|
|
{
|
|
MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection ");
|
|
close();
|
|
return LEVIN_ERROR_CONNECTION_TIMEDOUT;
|
|
}
|
|
if(!m_pservice_endpoint->call_run_once_service_io())
|
|
return LEVIN_ERROR_CONNECTION_DESTROYED;
|
|
}
|
|
|
|
if(m_deletion_initiated || m_protocol_released)
|
|
return LEVIN_ERROR_CONNECTION_DESTROYED;
|
|
|
|
CRITICAL_REGION_BEGIN(m_local_inv_buff_lock);
|
|
buff_out.swap(m_local_inv_buff);
|
|
m_local_inv_buff.clear();
|
|
CRITICAL_REGION_END();
|
|
|
|
return m_invoke_result_code;
|
|
}
|
|
|
|
int notify(int command, const std::string& in_buff)
|
|
{
|
|
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
|
|
boost::bind(&async_protocol_handler::finish_outer_call, this));
|
|
|
|
if(m_deletion_initiated)
|
|
return LEVIN_ERROR_CONNECTION_DESTROYED;
|
|
|
|
CRITICAL_REGION_LOCAL(m_call_lock);
|
|
|
|
if(m_deletion_initiated)
|
|
return LEVIN_ERROR_CONNECTION_DESTROYED;
|
|
|
|
bucket_head2 head = {0};
|
|
head.m_signature = LEVIN_SIGNATURE;
|
|
head.m_have_to_return_data = false;
|
|
head.m_cb = in_buff.size();
|
|
|
|
head.m_command = command;
|
|
head.m_protocol_version = LEVIN_PROTOCOL_VER_1;
|
|
head.m_flags = LEVIN_PACKET_REQUEST;
|
|
CRITICAL_REGION_BEGIN(m_send_lock);
|
|
if(!m_pservice_endpoint->do_send(&head, sizeof(head)))
|
|
{
|
|
LOG_ERROR_CC(m_connection_context, "Failed to do_send()");
|
|
return -1;
|
|
}
|
|
|
|
if(!m_pservice_endpoint->do_send(in_buff.data(), (int)in_buff.size()))
|
|
{
|
|
LOG_ERROR_CC(m_connection_context, "Failed to do_send()");
|
|
return -1;
|
|
}
|
|
CRITICAL_REGION_END();
|
|
LOG_DEBUG_CC(m_connection_context, "LEVIN_PACKET_SENT. [len=" << head.m_cb <<
|
|
", f=" << head.m_flags <<
|
|
", r?=" << head.m_have_to_return_data <<
|
|
", cmd = " << head.m_command <<
|
|
", ver=" << head.m_protocol_version);
|
|
|
|
return 1;
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
boost::uuids::uuid get_connection_id() {return m_connection_context.m_connection_id;}
|
|
//------------------------------------------------------------------------------------------
|
|
t_connection_context& get_context_ref() {return m_connection_context;}
|
|
};
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
void async_protocol_handler_config<t_connection_context>::del_connection(async_protocol_handler<t_connection_context>* pconn)
|
|
{
|
|
CRITICAL_REGION_BEGIN(m_connects_lock);
|
|
m_connects.erase(pconn->get_connection_id());
|
|
CRITICAL_REGION_END();
|
|
m_pcommands_handler->on_connection_close(pconn->m_connection_context);
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
void async_protocol_handler_config<t_connection_context>::del_out_connections(size_t count)
|
|
{
|
|
std::vector <boost::uuids::uuid> out_connections;
|
|
CRITICAL_REGION_BEGIN(m_connects_lock);
|
|
for (auto& c: m_connects)
|
|
{
|
|
if (!c.second->m_connection_context.m_is_income)
|
|
out_connections.push_back(c.first);
|
|
}
|
|
|
|
if (out_connections.size() == 0)
|
|
return;
|
|
|
|
// close random out connections
|
|
// TODO or better just keep removing random elements (performance)
|
|
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
|
|
shuffle(out_connections.begin(), out_connections.end(), std::default_random_engine(seed));
|
|
while (count > 0 && out_connections.size() > 0)
|
|
{
|
|
boost::uuids::uuid connection_id = *out_connections.begin();
|
|
async_protocol_handler<t_connection_context> *connection = find_connection(connection_id);
|
|
// we temporarily ref the connection so it doesn't drop from the m_connects table
|
|
// when we close it
|
|
connection->start_outer_call();
|
|
close(connection_id);
|
|
del_connection(m_connects.at(connection_id));
|
|
out_connections.erase(out_connections.begin());
|
|
connection->finish_outer_call();
|
|
--count;
|
|
}
|
|
|
|
CRITICAL_REGION_END();
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
void async_protocol_handler_config<t_connection_context>::add_connection(async_protocol_handler<t_connection_context>* pconn)
|
|
{
|
|
CRITICAL_REGION_BEGIN(m_connects_lock);
|
|
m_connects[pconn->get_connection_id()] = pconn;
|
|
CRITICAL_REGION_END();
|
|
m_pcommands_handler->on_connection_new(pconn->m_connection_context);
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
async_protocol_handler<t_connection_context>* async_protocol_handler_config<t_connection_context>::find_connection(boost::uuids::uuid connection_id) const
|
|
{
|
|
auto it = m_connects.find(connection_id);
|
|
return it == m_connects.end() ? 0 : it->second;
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
int async_protocol_handler_config<t_connection_context>::find_and_lock_connection(boost::uuids::uuid connection_id, async_protocol_handler<t_connection_context>*& aph)
|
|
{
|
|
CRITICAL_REGION_LOCAL(m_connects_lock);
|
|
aph = find_connection(connection_id);
|
|
if(0 == aph)
|
|
return LEVIN_ERROR_CONNECTION_NOT_FOUND;
|
|
if(!aph->start_outer_call())
|
|
return LEVIN_ERROR_CONNECTION_DESTROYED;
|
|
return LEVIN_OK;
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
int async_protocol_handler_config<t_connection_context>::invoke(int command, const std::string& in_buff, std::string& buff_out, boost::uuids::uuid connection_id)
|
|
{
|
|
async_protocol_handler<t_connection_context>* aph;
|
|
int r = find_and_lock_connection(connection_id, aph);
|
|
return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r;
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context> template<class callback_t>
|
|
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const std::string& in_buff, boost::uuids::uuid connection_id, callback_t cb, size_t timeout)
|
|
{
|
|
async_protocol_handler<t_connection_context>* aph;
|
|
int r = find_and_lock_connection(connection_id, aph);
|
|
return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r;
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context> template<class callback_t>
|
|
bool async_protocol_handler_config<t_connection_context>::foreach_connection(callback_t cb)
|
|
{
|
|
CRITICAL_REGION_LOCAL(m_connects_lock);
|
|
for(auto& c: m_connects)
|
|
{
|
|
async_protocol_handler<t_connection_context>* aph = c.second;
|
|
if(!cb(aph->get_context_ref()))
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context> template<class callback_t>
|
|
bool async_protocol_handler_config<t_connection_context>::for_connection(const boost::uuids::uuid &connection_id, callback_t cb)
|
|
{
|
|
CRITICAL_REGION_LOCAL(m_connects_lock);
|
|
async_protocol_handler<t_connection_context>* aph = find_connection(connection_id);
|
|
if (!aph)
|
|
return false;
|
|
if(!cb(aph->get_context_ref()))
|
|
return false;
|
|
return true;
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
size_t async_protocol_handler_config<t_connection_context>::get_connections_count()
|
|
{
|
|
CRITICAL_REGION_LOCAL(m_connects_lock);
|
|
return m_connects.size();
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
int async_protocol_handler_config<t_connection_context>::notify(int command, const std::string& in_buff, boost::uuids::uuid connection_id)
|
|
{
|
|
async_protocol_handler<t_connection_context>* aph;
|
|
int r = find_and_lock_connection(connection_id, aph);
|
|
return LEVIN_OK == r ? aph->notify(command, in_buff) : r;
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id)
|
|
{
|
|
CRITICAL_REGION_LOCAL(m_connects_lock);
|
|
async_protocol_handler<t_connection_context>* aph = find_connection(connection_id);
|
|
return 0 != aph ? aph->close() : false;
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
bool async_protocol_handler_config<t_connection_context>::update_connection_context(const t_connection_context& contxt)
|
|
{
|
|
CRITICAL_REGION_LOCAL(m_connects_lock);
|
|
async_protocol_handler<t_connection_context>* aph = find_connection(contxt.m_connection_id);
|
|
if(0 == aph)
|
|
return false;
|
|
aph->update_connection_context(contxt);
|
|
return true;
|
|
}
|
|
//------------------------------------------------------------------------------------------
|
|
template<class t_connection_context>
|
|
bool async_protocol_handler_config<t_connection_context>::request_callback(boost::uuids::uuid connection_id)
|
|
{
|
|
async_protocol_handler<t_connection_context>* aph;
|
|
int r = find_and_lock_connection(connection_id, aph);
|
|
if(LEVIN_OK == r)
|
|
{
|
|
aph->request_callback();
|
|
return true;
|
|
}
|
|
else
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
}
|