core: thread most of handle_incoming_tx
This commit is contained in:
parent
f57ee382b8
commit
158c3ecff3
4 changed files with 114 additions and 31 deletions
|
@ -37,6 +37,7 @@ using namespace epee;
|
|||
#include "common/util.h"
|
||||
#include "common/updates.h"
|
||||
#include "common/download.h"
|
||||
#include "common/task_region.h"
|
||||
#include "warnings.h"
|
||||
#include "crypto/crypto.h"
|
||||
#include "cryptonote_config.h"
|
||||
|
@ -76,6 +77,7 @@ namespace cryptonote
|
|||
m_checkpoints_path(""),
|
||||
m_last_dns_checkpoints_update(0),
|
||||
m_last_json_checkpoints_update(0),
|
||||
m_threadpool(tools::thread_group::optimal()),
|
||||
m_update_download(0)
|
||||
{
|
||||
m_checkpoints_updating.clear();
|
||||
|
@ -483,11 +485,9 @@ namespace cryptonote
|
|||
return false;
|
||||
}
|
||||
//-----------------------------------------------------------------------------------------------
|
||||
bool core::handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay)
|
||||
bool core::handle_incoming_tx_pre(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay)
|
||||
{
|
||||
tvc = boost::value_initialized<tx_verification_context>();
|
||||
//want to process all transactions sequentially
|
||||
CRITICAL_REGION_LOCAL(m_incoming_tx_lock);
|
||||
|
||||
if(tx_blob.size() > get_max_tx_size())
|
||||
{
|
||||
|
@ -497,9 +497,8 @@ namespace cryptonote
|
|||
return false;
|
||||
}
|
||||
|
||||
crypto::hash tx_hash = null_hash;
|
||||
crypto::hash tx_prefixt_hash = null_hash;
|
||||
transaction tx;
|
||||
tx_hash = null_hash;
|
||||
tx_prefixt_hash = null_hash;
|
||||
|
||||
if(!parse_tx_from_blob(tx, tx_hash, tx_prefixt_hash, tx_blob))
|
||||
{
|
||||
|
@ -509,15 +508,18 @@ namespace cryptonote
|
|||
}
|
||||
//std::cout << "!"<< tx.vin.size() << std::endl;
|
||||
|
||||
bad_semantics_txes_lock.lock();
|
||||
for (int idx = 0; idx < 2; ++idx)
|
||||
{
|
||||
if (bad_semantics_txes[idx].find(tx_hash) != bad_semantics_txes[idx].end())
|
||||
{
|
||||
bad_semantics_txes_lock.unlock();
|
||||
LOG_PRINT_L1("Transaction already seen with bad semantics, rejected");
|
||||
tvc.m_verifivation_failed = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
bad_semantics_txes_lock.unlock();
|
||||
|
||||
uint8_t version = m_blockchain_storage.get_current_hard_fork_version();
|
||||
const size_t max_tx_version = version == 1 ? 1 : 2;
|
||||
|
@ -528,18 +530,11 @@ namespace cryptonote
|
|||
return false;
|
||||
}
|
||||
|
||||
if(m_mempool.have_tx(tx_hash))
|
||||
{
|
||||
LOG_PRINT_L2("tx " << tx_hash << "already have transaction in tx_pool");
|
||||
return true;
|
||||
}
|
||||
|
||||
if(m_blockchain_storage.have_tx(tx_hash))
|
||||
{
|
||||
LOG_PRINT_L2("tx " << tx_hash << " already have transaction in blockchain");
|
||||
return true;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
//-----------------------------------------------------------------------------------------------
|
||||
bool core::handle_incoming_tx_post(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay)
|
||||
{
|
||||
if(!check_tx_syntax(tx))
|
||||
{
|
||||
LOG_PRINT_L1("WRONG TRANSACTION BLOB, Failed to check tx " << tx_hash << " syntax, rejected");
|
||||
|
@ -568,23 +563,84 @@ namespace cryptonote
|
|||
{
|
||||
LOG_PRINT_L1("WRONG TRANSACTION BLOB, Failed to check tx " << tx_hash << " semantic, rejected");
|
||||
tvc.m_verifivation_failed = true;
|
||||
bad_semantics_txes_lock.lock();
|
||||
bad_semantics_txes[0].insert(tx_hash);
|
||||
if (bad_semantics_txes[0].size() >= BAD_SEMANTICS_TXES_MAX_SIZE)
|
||||
{
|
||||
std::swap(bad_semantics_txes[0], bad_semantics_txes[1]);
|
||||
bad_semantics_txes[0].clear();
|
||||
}
|
||||
bad_semantics_txes_lock.unlock();
|
||||
return false;
|
||||
}
|
||||
|
||||
bool r = add_new_tx(tx, tx_hash, tx_prefixt_hash, tx_blob.size(), tvc, keeped_by_block, relayed, do_not_relay);
|
||||
if(tvc.m_verifivation_failed)
|
||||
{MERROR_VER("Transaction verification failed: " << tx_hash);}
|
||||
else if(tvc.m_verifivation_impossible)
|
||||
{MERROR_VER("Transaction verification impossible: " << tx_hash);}
|
||||
return true;
|
||||
}
|
||||
//-----------------------------------------------------------------------------------------------
|
||||
bool core::handle_incoming_txs(const std::list<blobdata>& tx_blobs, std::vector<tx_verification_context>& tvc, bool keeped_by_block, bool relayed, bool do_not_relay)
|
||||
{
|
||||
struct result { bool res; cryptonote::transaction tx; crypto::hash hash; crypto::hash prefix_hash; bool in_txpool; bool in_blockchain; };
|
||||
std::vector<result> results(tx_blobs.size());
|
||||
|
||||
if(tvc.m_added_to_pool)
|
||||
MDEBUG("tx added: " << tx_hash);
|
||||
tvc.resize(tx_blobs.size());
|
||||
tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) {
|
||||
std::list<blobdata>::const_iterator it = tx_blobs.begin();
|
||||
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
|
||||
region.run([&, i, it] {
|
||||
results[i].res = handle_incoming_tx_pre(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
|
||||
});
|
||||
}
|
||||
});
|
||||
tools::task_region(m_threadpool, [&] (tools::task_region_handle& region) {
|
||||
std::list<blobdata>::const_iterator it = tx_blobs.begin();
|
||||
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
|
||||
if (!results[i].res)
|
||||
continue;
|
||||
if(m_mempool.have_tx(results[i].hash))
|
||||
{
|
||||
LOG_PRINT_L2("tx " << results[i].hash << "already have transaction in tx_pool");
|
||||
}
|
||||
else if(m_blockchain_storage.have_tx(results[i].hash))
|
||||
{
|
||||
LOG_PRINT_L2("tx " << results[i].hash << " already have transaction in blockchain");
|
||||
}
|
||||
else
|
||||
{
|
||||
region.run([&, i, it] {
|
||||
results[i].res = handle_incoming_tx_post(*it, tvc[i], results[i].tx, results[i].hash, results[i].prefix_hash, keeped_by_block, relayed, do_not_relay);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
bool ok = true;
|
||||
std::list<blobdata>::const_iterator it = tx_blobs.begin();
|
||||
for (size_t i = 0; i < tx_blobs.size(); i++, ++it) {
|
||||
if (!results[i].res)
|
||||
{
|
||||
ok = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
ok &= add_new_tx(results[i].tx, results[i].hash, results[i].prefix_hash, it->size(), tvc[i], keeped_by_block, relayed, do_not_relay);
|
||||
if(tvc[i].m_verifivation_failed)
|
||||
{MERROR_VER("Transaction verification failed: " << results[i].hash);}
|
||||
else if(tvc[i].m_verifivation_impossible)
|
||||
{MERROR_VER("Transaction verification impossible: " << results[i].hash);}
|
||||
|
||||
if(tvc[i].m_added_to_pool)
|
||||
MDEBUG("tx added: " << results[i].hash);
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
//-----------------------------------------------------------------------------------------------
|
||||
bool core::handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay)
|
||||
{
|
||||
std::list<cryptonote::blobdata> tx_blobs;
|
||||
tx_blobs.push_back(tx_blob);
|
||||
std::vector<tx_verification_context> tvcv(1);
|
||||
bool r = handle_incoming_txs(tx_blobs, tvcv, keeped_by_block, relayed, do_not_relay);
|
||||
tvc = tvcv[0];
|
||||
return r;
|
||||
}
|
||||
//-----------------------------------------------------------------------------------------------
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
#include "cryptonote_protocol/cryptonote_protocol_handler_common.h"
|
||||
#include "storages/portable_storage_template_helper.h"
|
||||
#include "common/download.h"
|
||||
#include "common/thread_group.h"
|
||||
#include "tx_pool.h"
|
||||
#include "blockchain.h"
|
||||
#include "cryptonote_basic/miner.h"
|
||||
|
@ -114,6 +115,22 @@ namespace cryptonote
|
|||
*/
|
||||
bool handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay);
|
||||
|
||||
/**
|
||||
* @brief handles a list of incoming transactions
|
||||
*
|
||||
* Parses incoming transactions and, if nothing is obviously wrong,
|
||||
* passes them along to the transaction pool
|
||||
*
|
||||
* @param tx_blobs the txs to handle
|
||||
* @param tvc metadata about the transactions' validity
|
||||
* @param keeped_by_block if the transactions have been in a block
|
||||
* @param relayed whether or not the transactions were relayed to us
|
||||
* @param do_not_relay whether to prevent the transactions from being relayed
|
||||
*
|
||||
* @return true if the transactions made it to the transaction pool, otherwise false
|
||||
*/
|
||||
bool handle_incoming_txs(const std::list<blobdata>& tx_blobs, std::vector<tx_verification_context>& tvc, bool keeped_by_block, bool relayed, bool do_not_relay);
|
||||
|
||||
/**
|
||||
* @brief handles an incoming block
|
||||
*
|
||||
|
@ -753,6 +770,9 @@ namespace cryptonote
|
|||
*/
|
||||
bool check_tx_semantic(const transaction& tx, bool keeped_by_block) const;
|
||||
|
||||
bool handle_incoming_tx_pre(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay);
|
||||
bool handle_incoming_tx_post(const blobdata& tx_blob, tx_verification_context& tvc, cryptonote::transaction &tx, crypto::hash &tx_hash, crypto::hash &tx_prefixt_hash, bool keeped_by_block, bool relayed, bool do_not_relay);
|
||||
|
||||
/**
|
||||
* @copydoc miner::on_block_chain_update
|
||||
*
|
||||
|
@ -859,6 +879,9 @@ namespace cryptonote
|
|||
time_t start_time;
|
||||
|
||||
std::unordered_set<crypto::hash> bad_semantics_txes[2];
|
||||
boost::mutex bad_semantics_txes_lock;
|
||||
|
||||
tools::thread_group m_threadpool;
|
||||
|
||||
enum {
|
||||
UPDATES_DISABLED,
|
||||
|
|
|
@ -986,6 +986,7 @@ namespace cryptonote
|
|||
m_core.prepare_handle_incoming_blocks(blocks);
|
||||
|
||||
uint64_t block_process_time_full = 0, transactions_process_time_full = 0;
|
||||
size_t num_txs = 0;
|
||||
for(const block_complete_entry& block_entry: blocks)
|
||||
{
|
||||
if (m_stopping)
|
||||
|
@ -996,15 +997,17 @@ namespace cryptonote
|
|||
|
||||
// process transactions
|
||||
TIME_MEASURE_START(transactions_process_time);
|
||||
for(auto& tx_blob: block_entry.txs)
|
||||
num_txs += block_entry.txs.size();
|
||||
std::vector<tx_verification_context> tvc;
|
||||
m_core.handle_incoming_txs(block_entry.txs, tvc, true, true, false);
|
||||
std::list<blobdata>::const_iterator it = block_entry.txs.begin();
|
||||
for (size_t i = 0; i < tvc.size(); ++i, ++it)
|
||||
{
|
||||
tx_verification_context tvc = AUTO_VAL_INIT(tvc);
|
||||
m_core.handle_incoming_tx(tx_blob, tvc, true, true, false);
|
||||
if(tvc.m_verifivation_failed)
|
||||
if(tvc[i].m_verifivation_failed)
|
||||
{
|
||||
if (!m_p2p->for_connection(span_connection_id, [&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t f)->bool{
|
||||
LOG_ERROR_CCONTEXT("transaction verification failed on NOTIFY_RESPONSE_GET_OBJECTS, tx_id = "
|
||||
<< epee::string_tools::pod_to_hex(get_blob_hash(tx_blob)) << ", dropping connection");
|
||||
<< epee::string_tools::pod_to_hex(get_blob_hash(*it)) << ", dropping connection");
|
||||
m_p2p->drop_connection(context);
|
||||
m_block_queue.flush_spans(context.m_connection_id, true);
|
||||
return true;
|
||||
|
@ -1065,7 +1068,7 @@ namespace cryptonote
|
|||
|
||||
} // each download block
|
||||
|
||||
LOG_PRINT_CCONTEXT_L2("Block process time (" << blocks.size() << " blocks): " << block_process_time_full + transactions_process_time_full << " (" << transactions_process_time_full << "/" << block_process_time_full << ") ms");
|
||||
MCINFO("sync-info", "Block process time (" << blocks.size() << " blocks, " << num_txs << " txs): " << block_process_time_full + transactions_process_time_full << " (" << transactions_process_time_full << "/" << block_process_time_full << ") ms");
|
||||
|
||||
m_core.cleanup_handle_incoming_blocks();
|
||||
|
||||
|
|
|
@ -52,6 +52,7 @@ public:
|
|||
bool have_block(const crypto::hash& id) const {return true;}
|
||||
bool get_blockchain_top(uint64_t& height, crypto::hash& top_id)const{height=0;top_id=cryptonote::null_hash;return true;}
|
||||
bool handle_incoming_tx(const cryptonote::blobdata& tx_blob, cryptonote::tx_verification_context& tvc, bool keeped_by_block, bool relayed, bool do_not_relay) { return true; }
|
||||
bool handle_incoming_txs(const std::list<cryptonote::blobdata>& tx_blob, std::vector<cryptonote::tx_verification_context>& tvc, bool keeped_by_block, bool relayed, bool do_not_relay) { return true; }
|
||||
bool handle_incoming_block(const cryptonote::blobdata& block_blob, cryptonote::block_verification_context& bvc, bool update_miner_blocktemplate = true) { return true; }
|
||||
void pause_mine(){}
|
||||
void resume_mine(){}
|
||||
|
|
Loading…
Reference in a new issue