Merge pull request #6954

dff1d8067 Fix tx flush callback queueing (Lee Clagett)
This commit is contained in:
Alexander Blair 2020-12-01 14:22:16 -08:00
commit f41dce49ac
No known key found for this signature in database
GPG key ID: C64552D877C32479

View file

@ -283,9 +283,10 @@ namespace levin
strand(io_service), strand(io_service),
map(), map(),
channels(), channels(),
flush_time(std::chrono::steady_clock::time_point::max()),
connection_count(0), connection_count(0),
flush_callbacks(0),
nzone(zone), nzone(zone),
is_public(is_public),
pad_txs(pad_txs), pad_txs(pad_txs),
fluffing(false) fluffing(false)
{ {
@ -300,9 +301,10 @@ namespace levin
boost::asio::io_service::strand strand; boost::asio::io_service::strand strand;
net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems
std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand` std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand`
std::chrono::steady_clock::time_point flush_time; //!< Next expected Dandelion++ fluff flush
std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time
std::uint32_t flush_callbacks; //!< Number of active fluff flush callbacks queued
const epee::net_utils::zone nzone; //!< Zone is public ipv4/ipv6 connections, or i2p or tor const epee::net_utils::zone nzone; //!< Zone is public ipv4/ipv6 connections, or i2p or tor
const bool is_public; //!< Zone is public ipv4/ipv6 connections
const bool pad_txs; //!< Pad txs to the next boundary for privacy const bool pad_txs; //!< Pad txs to the next boundary for privacy
bool fluffing; //!< Zone is in Dandelion++ fluff epoch bool fluffing; //!< Zone is in Dandelion++ fluff epoch
}; };
@ -348,7 +350,6 @@ namespace levin
struct fluff_flush struct fluff_flush
{ {
std::shared_ptr<detail::zone> zone_; std::shared_ptr<detail::zone> zone_;
std::chrono::steady_clock::time_point flush_time_;
static void queue(std::shared_ptr<detail::zone> zone, const std::chrono::steady_clock::time_point flush_time) static void queue(std::shared_ptr<detail::zone> zone, const std::chrono::steady_clock::time_point flush_time)
{ {
@ -356,28 +357,21 @@ namespace levin
assert(zone->strand.running_in_this_thread()); assert(zone->strand.running_in_this_thread());
detail::zone& this_zone = *zone; detail::zone& this_zone = *zone;
this_zone.flush_time = flush_time; ++this_zone.flush_callbacks;
this_zone.flush_txs.expires_at(flush_time); this_zone.flush_txs.expires_at(flush_time);
this_zone.flush_txs.async_wait(this_zone.strand.wrap(fluff_flush{std::move(zone), flush_time})); this_zone.flush_txs.async_wait(this_zone.strand.wrap(fluff_flush{std::move(zone)}));
} }
void operator()(const boost::system::error_code error) void operator()(const boost::system::error_code error)
{ {
if (!zone_ || !zone_->p2p) if (!zone_ || !zone_->flush_callbacks || --zone_->flush_callbacks || !zone_->p2p)
return; return;
assert(zone_->strand.running_in_this_thread()); assert(zone_->strand.running_in_this_thread());
const bool timer_error = bool(error); const bool timer_error = bool(error);
if (timer_error) if (timer_error && error != boost::system::errc::operation_canceled)
{ throw boost::system::system_error{error, "fluff_flush timer failed"};
if (error != boost::system::errc::operation_canceled)
throw boost::system::system_error{error, "fluff_flush timer failed"};
// new timer canceled this one set in future
if (zone_->flush_time < flush_time_)
return;
}
const auto now = std::chrono::steady_clock::now(); const auto now = std::chrono::steady_clock::now();
auto next_flush = std::chrono::steady_clock::time_point::max(); auto next_flush = std::chrono::steady_clock::time_point::max();
@ -413,8 +407,6 @@ namespace levin
if (next_flush != std::chrono::steady_clock::time_point::max()) if (next_flush != std::chrono::steady_clock::time_point::max())
fluff_flush::queue(std::move(zone_), next_flush); fluff_flush::queue(std::move(zone_), next_flush);
else
zone_->flush_time = next_flush; // signal that no timer is set
} }
}; };
@ -449,13 +441,11 @@ namespace levin
MDEBUG("Queueing " << txs.size() << " transaction(s) for Dandelion++ fluffing"); MDEBUG("Queueing " << txs.size() << " transaction(s) for Dandelion++ fluffing");
bool available = false; zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context)
zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush, &available] (detail::p2p_context& context)
{ {
// When i2p/tor, only fluff to outbound connections // When i2p/tor, only fluff to outbound connections
if (source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income)) if (source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income))
{ {
available = true;
if (context.fluff_txs.empty()) if (context.fluff_txs.empty())
context.flush_time = now + (context.m_is_income ? in_duration() : out_duration()); context.flush_time = now + (context.m_is_income ? in_duration() : out_duration());
@ -467,10 +457,9 @@ namespace levin
return true; return true;
}); });
if (!available) if (next_flush == std::chrono::steady_clock::time_point::max())
MWARNING("Unable to send transaction(s), no available connections"); MWARNING("Unable to send transaction(s), no available connections");
else if (!zone->flush_callbacks || next_flush < zone->flush_txs.expires_at())
if (next_flush < zone->flush_time)
fluff_flush::queue(std::move(zone), next_flush); fluff_flush::queue(std::move(zone), next_flush);
} }
}; };