danicoin/tests/net_load_tests/clt.cpp
moneromooo-monero 5833d66f65
Change logging to easylogging++
This replaces the epee and data_loggers logging systems with
a single one, and also adds filename:line and explicit severity
levels. Categories may be defined, and logging severity set
by category (or set of categories). epee style 0-4 log level
maps to a sensible severity configuration. Log files now also
rotate when reaching 100 MB.

To select which logs to output, use the MONERO_LOGS environment
variable, with a comma separated list of categories (globs are
supported), with their requested severity level after a colon.
If a log matches more than one such setting, the last one in
the configuration string applies. A few examples:

This one is (mostly) silent, only outputting fatal errors:

MONERO_LOGS=*:FATAL

This one is very verbose:

MONERO_LOGS=*:TRACE

This one is totally silent (logwise):

MONERO_LOGS=""

This one outputs all errors and warnings, except for the
"verify" category, which prints just fatal errors (the verify
category is used for logs about incoming transactions and
blocks, and it is expected that some/many will fail to verify,
hence we don't want the spam):

MONERO_LOGS=*:WARNING,verify:FATAL

Log levels are, in decreasing order of priority:
FATAL, ERROR, WARNING, INFO, DEBUG, TRACE

Subcategories may be added using prefixes and globs. This
example will output net.p2p logs at the TRACE level, but all
other net* logs only at INFO:

MONERO_LOGS=*:ERROR,net*:INFO,net.p2p:TRACE

Logs which are intended for the user (which Monero was using
a lot through epee, but really isn't a nice way to go things)
should use the "global" category. There are a few helper macros
for using this category, eg: MGINFO("this shows up by default")
or MGINFO_RED("this is red"), to try to keep a similar look
and feel for now.

Existing epee log macros still exist, and map to the new log
levels, but since they're used as a "user facing" UI element
as much as a logging system, they often don't map well to log
severities (ie, a log level 0 log may be an error, or may be
something we want the user to see, such as an important info).
In those cases, I tried to use the new macros. In other cases,
I left the existing macros in. When modifying logs, it is
probably best to switch to the new macros with explicit levels.

The --log-level options and set_log commands now also accept
category settings, in addition to the epee style log levels.
2017-01-16 00:25:46 +00:00

637 lines
28 KiB
C++

// Copyright (c) 2014-2016, The Monero Project
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without modification, are
// permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of
// conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list
// of conditions and the following disclaimer in the documentation and/or other
// materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be
// used to endorse or promote products derived from this software without specific
// prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// Parts of this file are originally copyright (c) 2012-2013 The Cryptonote developers
#include <atomic>
#include <chrono>
#include <functional>
#include <numeric>
#include <thread>
#include <vector>
#include "gtest/gtest.h"
#include "include_base_utils.h"
#include "misc_language.h"
#include "misc_log_ex.h"
#include "storages/levin_abstract_invoke2.h"
#include "net_load_tests.h"
using namespace net_load_tests;
namespace
{
const size_t CONNECTION_COUNT = 100000;
const size_t CONNECTION_TIMEOUT = 10000;
const size_t DEFAULT_OPERATION_TIMEOUT = 30000;
const size_t RESERVED_CONN_CNT = 1;
template<typename t_predicate>
bool busy_wait_for(size_t timeout_ms, const t_predicate& predicate, size_t sleep_ms = 10)
{
for (size_t i = 0; i < timeout_ms / sleep_ms; ++i)
{
if (predicate())
return true;
//std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
epee::misc_utils::sleep_no_w(static_cast<long>(sleep_ms));
}
return false;
}
class t_connection_opener_1
{
public:
t_connection_opener_1(test_tcp_server& tcp_server, size_t open_request_target)
: m_tcp_server(tcp_server)
, m_open_request_target(open_request_target)
, m_next_id(0)
, m_error_count(0)
, m_connections(open_request_target)
{
for (auto& conn_id : m_connections)
conn_id = boost::uuids::nil_uuid();
}
bool open()
{
size_t id = m_next_id.fetch_add(1, std::memory_order_relaxed);
if (m_open_request_target <= id)
return false;
bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
if (!ec)
{
m_connections[id] = context.m_connection_id;
}
else
{
m_error_count.fetch_add(1, std::memory_order_relaxed);
}
});
if (!r)
{
m_error_count.fetch_add(1, std::memory_order_relaxed);
}
return true;
}
bool close(size_t id)
{
if (!m_connections[id].is_nil())
{
m_tcp_server.get_config_object().close(m_connections[id]);
return true;
}
else
{
return false;
}
}
size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }
private:
test_tcp_server& m_tcp_server;
size_t m_open_request_target;
std::atomic<size_t> m_next_id;
std::atomic<size_t> m_error_count;
std::vector<boost::uuids::uuid> m_connections;
};
class t_connection_opener_2
{
public:
t_connection_opener_2(test_tcp_server& tcp_server, size_t open_request_target, size_t max_opened_connection_count)
: m_tcp_server(tcp_server)
, m_open_request_target(open_request_target)
, m_open_request_count(0)
, m_error_count(0)
, m_open_close_test_helper(tcp_server, open_request_target, max_opened_connection_count)
{
}
bool open_and_close()
{
size_t req_count = m_open_request_count.fetch_add(1, std::memory_order_relaxed);
if (m_open_request_target <= req_count)
return false;
bool r = m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [=](const test_connection_context& context, const boost::system::error_code& ec) {
if (!ec)
{
m_open_close_test_helper.handle_new_connection(context.m_connection_id);
}
else
{
m_error_count.fetch_add(1, std::memory_order_relaxed);
}
});
if (!r)
{
m_error_count.fetch_add(1, std::memory_order_relaxed);
}
return true;
}
void close_remaining_connections()
{
m_open_close_test_helper.close_remaining_connections();
}
size_t opened_connection_count() const { return m_open_close_test_helper.opened_connection_count(); }
size_t error_count() const { return m_error_count.load(std::memory_order_relaxed); }
private:
test_tcp_server& m_tcp_server;
size_t m_open_request_target;
std::atomic<size_t> m_open_request_count;
std::atomic<size_t> m_error_count;
open_close_test_helper m_open_close_test_helper;
};
class net_load_test_clt : public ::testing::Test
{
public:
net_load_test_clt()
: m_tcp_server(epee::net_utils::e_connection_type_RPC) // RPC disables network limit for unit tests
{
}
protected:
virtual void SetUp()
{
m_thread_count = (std::max)(min_thread_count, std::thread::hardware_concurrency() / 2);
m_tcp_server.get_config_object().m_pcommands_handler = &m_commands_handler;
m_tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
ASSERT_TRUE(m_tcp_server.init_server(clt_port, "127.0.0.1"));
ASSERT_TRUE(m_tcp_server.run_server(m_thread_count, false));
// Connect to server
std::atomic<int> conn_status(0);
m_cmd_conn_id = boost::uuids::nil_uuid();
ASSERT_TRUE(m_tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
if (!ec)
{
m_cmd_conn_id = context.m_connection_id;
}
else
{
LOG_ERROR("Connection error: " << ec.message());
}
conn_status.store(1, std::memory_order_seq_cst);
}));
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "connect_async timed out";
ASSERT_EQ(1, conn_status.load(std::memory_order_seq_cst));
ASSERT_FALSE(m_cmd_conn_id.is_nil());
conn_status.store(0, std::memory_order_seq_cst);
CMD_RESET_STATISTICS::request req;
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_RESET_STATISTICS::response>(m_cmd_conn_id, CMD_RESET_STATISTICS::ID, req,
m_tcp_server.get_config_object(), [&](int code, const CMD_RESET_STATISTICS::response& rsp, const test_connection_context&) {
conn_status.store(code, std::memory_order_seq_cst);
}));
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) << "reset statistics timed out";
ASSERT_LT(0, conn_status.load(std::memory_order_seq_cst));
}
virtual void TearDown()
{
m_tcp_server.send_stop_signal();
ASSERT_TRUE(m_tcp_server.timed_wait_server_stop(DEFAULT_OPERATION_TIMEOUT));
}
static void TearDownTestCase()
{
// Stop server
test_levin_commands_handler commands_handler;
test_tcp_server tcp_server(epee::net_utils::e_connection_type_NET);
tcp_server.get_config_object().m_pcommands_handler = &commands_handler;
tcp_server.get_config_object().m_invoke_timeout = CONNECTION_TIMEOUT;
if (!tcp_server.init_server(clt_port, "127.0.0.1")) return;
if (!tcp_server.run_server(2, false)) return;
// Connect to server and invoke shutdown command
std::atomic<int> conn_status(0);
boost::uuids::uuid cmd_conn_id = boost::uuids::nil_uuid();
tcp_server.connect_async("127.0.0.1", srv_port, CONNECTION_TIMEOUT, [&](const test_connection_context& context, const boost::system::error_code& ec) {
cmd_conn_id = context.m_connection_id;
conn_status.store(!ec ? 1 : -1, std::memory_order_seq_cst);
});
if (!busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != conn_status.load(std::memory_order_seq_cst); })) return;
if (1 != conn_status.load(std::memory_order_seq_cst)) return;
epee::net_utils::notify_remote_command2(cmd_conn_id, CMD_SHUTDOWN::ID, CMD_SHUTDOWN::request(), tcp_server.get_config_object());
busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != commands_handler.close_connection_counter(); });
}
template<typename Func>
static auto call_func(size_t /*thread_index*/, const Func& func, int) -> decltype(func())
{
func();
}
template<typename Func>
static auto call_func(size_t thread_index, const Func& func, long) -> decltype(func(thread_index))
{
func(thread_index);
}
template<typename Func>
void parallel_exec(const Func& func)
{
unit_test::call_counter properly_finished_threads;
std::vector<std::thread> threads(m_thread_count);
for (size_t i = 0; i < threads.size(); ++i)
{
threads[i] = std::thread([&, i] {
call_func(i, func, 0);
properly_finished_threads.inc();
});
}
for (auto& th : threads)
th.join();
ASSERT_EQ(properly_finished_threads.get(), m_thread_count);
}
void get_server_statistics(CMD_GET_STATISTICS::response& statistics)
{
std::atomic<int> req_status(0);
CMD_GET_STATISTICS::request req;
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_GET_STATISTICS::response>(m_cmd_conn_id, CMD_GET_STATISTICS::ID, req,
m_tcp_server.get_config_object(), [&](int code, const CMD_GET_STATISTICS::response& rsp, const test_connection_context&) {
if (0 < code)
{
statistics = rsp;
}
else
{
LOG_ERROR("Get server statistics error: " << code);
}
req_status.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
}));
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 0 != req_status.load(std::memory_order_seq_cst); })) << "get_server_statistics timed out";
ASSERT_EQ(1, req_status.load(std::memory_order_seq_cst));
}
template <typename t_predicate>
bool busy_wait_for_server_statistics(CMD_GET_STATISTICS::response& statistics, const t_predicate& predicate)
{
for (size_t i = 0; i < 30; ++i)
{
get_server_statistics(statistics);
if (predicate(statistics))
{
return true;
}
//std::this_thread::sleep_for(std::chrono::seconds(1));
epee::misc_utils::sleep_no_w(1000);
}
return false;
}
void ask_for_data_requests(size_t request_size = 0)
{
CMD_SEND_DATA_REQUESTS::request req;
req.request_size = request_size;
epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_SEND_DATA_REQUESTS::ID, req, m_tcp_server.get_config_object());
}
protected:
test_tcp_server m_tcp_server;
test_levin_commands_handler m_commands_handler;
size_t m_thread_count;
boost::uuids::uuid m_cmd_conn_id;
};
}
TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_client)
{
// Open connections
t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
parallel_exec([&] {
while (connection_opener.open());
});
// Wait for all open requests to complete
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
" / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
// Check
ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
// Close connections
parallel_exec([&](size_t thread_idx) {
for (size_t i = thread_idx; i < CONNECTION_COUNT; i += m_thread_count)
{
connection_opener.close(i);
}
});
// Wait for all opened connections to close
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
" / " << m_commands_handler.close_connection_counter());
// Check all connections are closed
ASSERT_EQ(m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT, m_commands_handler.close_connection_counter());
ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
// Wait for server to handle all open and close requests
CMD_GET_STATISTICS::response srv_stat;
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status
// It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
// Request data from server, it causes to close rest connections
ask_for_data_requests();
// Wait for server to close rest connections
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status. All connections should be closed
ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
}
TEST_F(net_load_test_clt, a_lot_of_client_connections_and_connections_closed_by_server)
{
// Open connections
t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
parallel_exec([&] {
while (connection_opener.open());
});
// Wait for all open requests to complete
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
" / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
// Check
ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() - m_commands_handler.close_connection_counter(), m_tcp_server.get_config_object().get_connections_count());
// Wait for server accepts all connections
CMD_GET_STATISTICS::response srv_stat;
int last_new_connection_counter = -1;
busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
});
// Close connections
CMD_CLOSE_ALL_CONNECTIONS::request req;
ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
// Wait for all opened connections to close
busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); });
LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
" / " << m_commands_handler.close_connection_counter());
// It's OK, if server didn't close all connections, because it could accept not all our connections
ASSERT_LE(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
// Wait for server to handle all open and close requests
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status
ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
// Close rest connections
m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
if (ctx.m_connection_id != m_cmd_conn_id)
{
CMD_DATA_REQUEST::request req;
bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
if (code <= 0)
{
LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
}
});
if (!r)
LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
}
return true;
});
// Wait for all opened connections to close
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
" / " << m_commands_handler.close_connection_counter());
// Check
ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
}
TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_client)
{
static const size_t MAX_OPENED_CONN_COUNT = 100;
// Open/close connections
t_connection_opener_2 connection_opener(m_tcp_server, CONNECTION_COUNT, MAX_OPENED_CONN_COUNT);
parallel_exec([&] {
while (connection_opener.open_and_close());
});
// Wait for all open requests to complete
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
" / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
// Check
ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
// Wait for all close requests to complete
EXPECT_TRUE(busy_wait_for(4 * DEFAULT_OPERATION_TIMEOUT, [&](){ return connection_opener.opened_connection_count() <= MAX_OPENED_CONN_COUNT; }));
LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());
// Check
ASSERT_EQ(MAX_OPENED_CONN_COUNT, connection_opener.opened_connection_count());
connection_opener.close_remaining_connections();
// Wait for all close requests to complete
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; }));
LOG_PRINT_L0("actual number of opened connections: " << connection_opener.opened_connection_count());
ASSERT_EQ(m_commands_handler.new_connection_counter(), m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT);
ASSERT_EQ(0, connection_opener.opened_connection_count());
ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
// Wait for server to handle all open and close requests
CMD_GET_STATISTICS::response srv_stat;
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status
// It's OK, if server didn't close all opened connections, because of it could receive not all FIN packets
ASSERT_LE(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_LE(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
// Request data from server, it causes to close rest connections
ask_for_data_requests();
// Wait for server to close rest connections
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status. All connections should be closed
ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
}
TEST_F(net_load_test_clt, permament_open_and_close_and_connections_closed_by_server)
{
static const size_t MAX_OPENED_CONN_COUNT = 100;
// Init test
std::atomic<int> test_state(0);
CMD_START_OPEN_CLOSE_TEST::request req_start;
req_start.open_request_target = CONNECTION_COUNT;
req_start.max_opened_conn_count = MAX_OPENED_CONN_COUNT;
ASSERT_TRUE(epee::net_utils::async_invoke_remote_command2<CMD_START_OPEN_CLOSE_TEST::response>(m_cmd_conn_id, CMD_START_OPEN_CLOSE_TEST::ID, req_start,
m_tcp_server.get_config_object(), [&](int code, const CMD_START_OPEN_CLOSE_TEST::response&, const test_connection_context&) {
test_state.store(0 < code ? 1 : -1, std::memory_order_seq_cst);
}));
// Wait for server response
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&]{ return 1 == test_state.load(std::memory_order_seq_cst); }));
ASSERT_EQ(1, test_state.load(std::memory_order_seq_cst));
// Open connections
t_connection_opener_1 connection_opener(m_tcp_server, CONNECTION_COUNT);
parallel_exec([&] {
while (connection_opener.open());
});
// Wait for all open requests to complete
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return CONNECTION_COUNT + RESERVED_CONN_CNT <= m_commands_handler.new_connection_counter() + connection_opener.error_count(); }));
LOG_PRINT_L0("number of opened connections / fails (total): " << m_commands_handler.new_connection_counter() <<
" / " << connection_opener.error_count() << " (" << (m_commands_handler.new_connection_counter() + connection_opener.error_count()) << ")");
LOG_PRINT_L0("actual number of opened connections: " << m_tcp_server.get_config_object().get_connections_count());
ASSERT_GT(m_commands_handler.new_connection_counter(), RESERVED_CONN_CNT);
ASSERT_EQ(m_commands_handler.new_connection_counter() + connection_opener.error_count(), CONNECTION_COUNT + RESERVED_CONN_CNT);
// Wait for server accepts all connections
CMD_GET_STATISTICS::response srv_stat;
int last_new_connection_counter = -1;
busy_wait_for_server_statistics(srv_stat, [&last_new_connection_counter](const CMD_GET_STATISTICS::response& stat) {
if (last_new_connection_counter == static_cast<int>(stat.new_connection_counter)) return true;
else { last_new_connection_counter = static_cast<int>(stat.new_connection_counter); return false; }
});
// Ask server to close rest connections
CMD_CLOSE_ALL_CONNECTIONS::request req;
ASSERT_TRUE(epee::net_utils::notify_remote_command2(m_cmd_conn_id, CMD_CLOSE_ALL_CONNECTIONS::ID, req, m_tcp_server.get_config_object()));
// Wait for almost all connections to be closed by server
busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() <= m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT; });
// It's OK, if there are opened connections, because server could accept not all our connections
ASSERT_LE(m_commands_handler.close_connection_counter() + RESERVED_CONN_CNT, m_commands_handler.new_connection_counter());
ASSERT_LE(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
// Wait for server to handle all open and close requests
busy_wait_for_server_statistics(srv_stat, [](const CMD_GET_STATISTICS::response& stat) { return stat.new_connection_counter - RESERVED_CONN_CNT <= stat.close_connection_counter; });
LOG_PRINT_L0("server statistics: " << srv_stat.to_string());
// Check server status
ASSERT_EQ(srv_stat.close_connection_counter, srv_stat.new_connection_counter - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, srv_stat.opened_connections_count);
// Close rest connections
m_tcp_server.get_config_object().foreach_connection([&](test_connection_context& ctx) {
if (ctx.m_connection_id != m_cmd_conn_id)
{
CMD_DATA_REQUEST::request req;
bool r = epee::net_utils::async_invoke_remote_command2<CMD_DATA_REQUEST::response>(ctx.m_connection_id, CMD_DATA_REQUEST::ID, req,
m_tcp_server.get_config_object(), [=](int code, const CMD_DATA_REQUEST::response& rsp, const test_connection_context&) {
if (code <= 0)
{
LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST. code = " << code);
}
});
if (!r)
LOG_PRINT_L0("Failed to invoke CMD_DATA_REQUEST");
}
return true;
});
// Wait for all opened connections to close
EXPECT_TRUE(busy_wait_for(DEFAULT_OPERATION_TIMEOUT, [&](){ return m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT <= m_commands_handler.close_connection_counter(); }));
LOG_PRINT_L0("number of opened / closed connections: " << m_tcp_server.get_config_object().get_connections_count() <<
" / " << m_commands_handler.close_connection_counter());
// Check
ASSERT_EQ(m_commands_handler.close_connection_counter(), m_commands_handler.new_connection_counter() - RESERVED_CONN_CNT);
ASSERT_EQ(RESERVED_CONN_CNT, m_tcp_server.get_config_object().get_connections_count());
}
int main(int argc, char** argv)
{
epee::debug::get_set_enable_assert(true, false);
//set up logging options
mlog_configure(mlog_get_default_log_path("core_tests.log"), true);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}