From 490590306efdd60a4663967c1c0a95d4acb4b886 Mon Sep 17 00:00:00 2001 From: moneromooo-monero Date: Fri, 27 Nov 2015 17:25:15 +0000 Subject: [PATCH] wallet2: parallelize pulling blocks and processing them on refresh This needed locking the use of m_http_client, to avoid collisions in I/O. --- src/wallet/wallet2.cpp | 57 ++++++++++++++++++++++++++++++++---------- src/wallet/wallet2.h | 5 ++++ 2 files changed, 49 insertions(+), 13 deletions(-) diff --git a/src/wallet/wallet2.cpp b/src/wallet/wallet2.cpp index 69b863c3..f79bad47 100644 --- a/src/wallet/wallet2.cpp +++ b/src/wallet/wallet2.cpp @@ -274,7 +274,9 @@ void wallet2::process_new_transaction(const cryptonote::transaction& tx, uint64_ cryptonote::COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::request req = AUTO_VAL_INIT(req); cryptonote::COMMAND_RPC_GET_TX_GLOBAL_OUTPUTS_INDEXES::response res = AUTO_VAL_INIT(res); req.txid = get_transaction_hash(tx); + m_daemon_rpc_mutex.lock(); bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/get_o_indexes.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "get_o_indexes.bin"); THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "get_o_indexes.bin"); THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_out_indices_error, res.status); @@ -485,7 +487,9 @@ void wallet2::pull_blocks(uint64_t start_height, uint64_t &blocks_start_height, req.block_ids = short_chain_history; req.start_height = start_height; + m_daemon_rpc_mutex.lock(); bool r = net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getblocks.bin", req, res, m_http_client, WALLET_RCP_CONNECTION_TIMEOUT); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getblocks.bin"); THROW_WALLET_EXCEPTION_IF(res.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getblocks.bin"); THROW_WALLET_EXCEPTION_IF(res.status != CORE_RPC_STATUS_OK, error::get_blocks_error, res.status); @@ -610,6 +614,23 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched) refresh(start_height, blocks_fetched, received_money); } //---------------------------------------------------------------------------------------------------- +void wallet2::pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list &short_chain_history, const std::list &prev_blocks, std::list &blocks) +{ + // prepend the last 3 blocks, should be enough to guard against a block or two's reorg + cryptonote::block bl; + std::list::const_reverse_iterator i = prev_blocks.rbegin(); + for (size_t n = 0; n < std::min((size_t)3, prev_blocks.size()); ++n) + { + bool ok = cryptonote::parse_and_validate_block_from_blob(i->block, bl); + THROW_WALLET_EXCEPTION_IF(!ok, error::block_parse_error, i->block); + short_chain_history.push_front(cryptonote::get_block_hash(bl)); + ++i; + } + + // pull the new blocks + pull_blocks(start_height, blocks_start_height, short_chain_history, blocks); +} +//---------------------------------------------------------------------------------------------------- void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& received_money) { received_money = false; @@ -618,30 +639,32 @@ void wallet2::refresh(uint64_t start_height, uint64_t & blocks_fetched, bool& re size_t try_count = 0; crypto::hash last_tx_hash_id = m_transfers.size() ? get_transaction_hash(m_transfers.back().m_tx) : null_hash; std::list short_chain_history; + std::thread pull_thread; + uint64_t blocks_start_height; + std::list blocks; + // pull the first set of blocks get_short_chain_history(short_chain_history); + pull_blocks(start_height, blocks_start_height, short_chain_history, blocks); + while(m_run.load(std::memory_order_relaxed)) { try { - uint64_t blocks_start_height; - std::list blocks; - pull_blocks(start_height, blocks_start_height, short_chain_history, blocks); + // pull the next set of blocks while we're processing the current one + uint64_t next_blocks_start_height; + std::list next_blocks; + pull_thread = std::thread([&]{pull_next_blocks(start_height, next_blocks_start_height, short_chain_history, blocks, next_blocks);}); + process_blocks(blocks_start_height, blocks, added_blocks); blocks_fetched += added_blocks; + pull_thread.join(); if(!added_blocks) break; - cryptonote::block bl; - // prepend the last 3 blocks, should be enough to guard against a block or two's reorg - std::list::const_reverse_iterator i = blocks.rbegin(); - for (int n = 0; n < 3; ++n) - { - bool ok = cryptonote::parse_and_validate_block_from_blob(i->block, bl); - THROW_WALLET_EXCEPTION_IF(!ok, error::block_parse_error, i->block); - short_chain_history.push_front(cryptonote::get_block_hash(bl)); - ++i; - } + // switch to the new blocks from the daemon + blocks_start_height = next_blocks_start_height; + blocks = next_blocks; } catch (const std::exception&) { @@ -1072,6 +1095,8 @@ bool wallet2::prepare_file_names(const std::string& file_path) //---------------------------------------------------------------------------------------------------- bool wallet2::check_connection() { + std::lock_guard lock(m_daemon_rpc_mutex); + if(m_http_client.is_connected()) return true; @@ -1284,7 +1309,9 @@ void wallet2::rescan_spent() COMMAND_RPC_IS_KEY_IMAGE_SPENT::request req = AUTO_VAL_INIT(req); COMMAND_RPC_IS_KEY_IMAGE_SPENT::response daemon_resp = AUTO_VAL_INIT(daemon_resp); req.key_images = key_images; + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_json_remote_command2(m_daemon_address + "/is_key_image_spent", req, daemon_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "is_key_image_spent"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "is_key_image_spent"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::is_key_image_spent_error, daemon_resp.status); @@ -1591,7 +1618,9 @@ void wallet2::commit_tx(pending_tx& ptx) COMMAND_RPC_SEND_RAW_TX::request req; req.tx_as_hex = epee::string_tools::buff_to_hex_nodelimer(tx_to_blob(ptx.tx)); COMMAND_RPC_SEND_RAW_TX::response daemon_send_resp; + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_json_remote_command2(m_daemon_address + "/sendrawtransaction", req, daemon_send_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "sendrawtransaction"); THROW_WALLET_EXCEPTION_IF(daemon_send_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "sendrawtransaction"); THROW_WALLET_EXCEPTION_IF(daemon_send_resp.status != CORE_RPC_STATUS_OK, error::tx_rejected, ptx.tx, daemon_send_resp.status); @@ -1779,7 +1808,9 @@ void wallet2::transfer_selected(const std::vectoramount()); } + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getrandom_outs.bin", req, daemon_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::get_random_outs_error, daemon_resp.status); diff --git a/src/wallet/wallet2.h b/src/wallet/wallet2.h index c1365f27..b71697c1 100644 --- a/src/wallet/wallet2.h +++ b/src/wallet/wallet2.h @@ -356,6 +356,7 @@ namespace tools bool is_transfer_unlocked(const transfer_details& td) const; bool clear(); void pull_blocks(uint64_t start_height, uint64_t& blocks_start_height, const std::list &short_chain_history, std::list &blocks); + void pull_next_blocks(uint64_t start_height, uint64_t &blocks_start_height, std::list &short_chain_history, const std::list &prev_blocks, std::list &blocks); void process_blocks(uint64_t start_height, const std::list &blocks, uint64_t& blocks_added); uint64_t select_transfers(uint64_t needed_money, bool add_dust, uint64_t dust, std::list& selected_transfers); bool prepare_file_names(const std::string& file_path); @@ -388,6 +389,8 @@ namespace tools std::atomic m_run; + std::mutex m_daemon_rpc_mutex; + i_wallet2_callback* m_callback; bool m_testnet; bool m_restricted; @@ -564,7 +567,9 @@ namespace tools req.amounts.push_back(it->amount()); } + m_daemon_rpc_mutex.lock(); bool r = epee::net_utils::invoke_http_bin_remote_command2(m_daemon_address + "/getrandom_outs.bin", req, daemon_resp, m_http_client, 200000); + m_daemon_rpc_mutex.unlock(); THROW_WALLET_EXCEPTION_IF(!r, error::no_connection_to_daemon, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status == CORE_RPC_STATUS_BUSY, error::daemon_busy, "getrandom_outs.bin"); THROW_WALLET_EXCEPTION_IF(daemon_resp.status != CORE_RPC_STATUS_OK, error::get_random_outs_error, daemon_resp.status);