Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "multi_sig_transactions/gossip_propagation_strategy.hpp"
7 :
8 : #include <numeric>
9 : #include <random>
10 :
11 : #include <boost/assert.hpp>
12 : #include <boost/range/irange.hpp>
13 : #include "common/bind.hpp"
14 :
15 : namespace iroha {
16 :
17 : using PropagationData = PropagationStrategy::PropagationData;
18 : using OptPeer = GossipPropagationStrategy::OptPeer;
19 : using PeerProviderFactory = GossipPropagationStrategy::PeerProviderFactory;
20 : using std::chrono::steady_clock;
21 :
22 : GossipPropagationStrategy::GossipPropagationStrategy(
23 : PeerProviderFactory peer_factory,
24 : rxcpp::observe_on_one_worker emit_worker,
25 : const GossipPropagationStrategyParams ¶ms)
26 11 : : peer_factory(peer_factory),
27 11 : non_visited({}),
28 11 : emit_worker(emit_worker),
29 11 : emitent(rxcpp::observable<>::interval(steady_clock::now(),
30 11 : params.emission_period)
31 : .map([this, params](int) {
32 139 : PropagationData vec;
33 145 : auto range = boost::irange(0u, params.amount_per_once);
34 : // push until find empty element
35 149 : std::find_if_not(
36 : range.begin(), range.end(), [this, &vec](int) {
37 : return this->visit() | [&vec](auto e) -> bool {
38 248 : vec.push_back(e);
39 248 : return true; // proceed
40 : };
41 0 : });
42 149 : return vec;
43 149 : })
44 11 : .subscribe_on(emit_worker)) {}
45 :
46 : rxcpp::observable<PropagationData> GossipPropagationStrategy::emitter() {
47 20 : return emitent;
48 : }
49 :
50 : GossipPropagationStrategy::~GossipPropagationStrategy() {
51 : // Make sure that emitent callback have finish and haven't started yet
52 11 : std::lock_guard<std::mutex> lock(m);
53 11 : peer_factory.reset();
54 11 : }
55 :
56 : bool GossipPropagationStrategy::initQueue() {
57 : return peer_factory->createPeerQuery() | [](const auto &query) {
58 61 : return query->getLedgerPeers();
59 : } | [](auto &&data) -> boost::optional<PropagationData> {
60 48 : if (data.size() == 0) {
61 13 : return {};
62 : }
63 35 : return std::move(data);
64 : } | [this](auto &&data) -> bool { // nullopt implicitly casts to false
65 35 : this->last_data = std::move(data);
66 35 : this->non_visited.resize(this->last_data.size());
67 35 : std::iota(this->non_visited.begin(), this->non_visited.end(), 0);
68 35 : std::shuffle(this->non_visited.begin(),
69 35 : this->non_visited.end(),
70 35 : std::default_random_engine{});
71 35 : return true;
72 : };
73 0 : }
74 :
75 : OptPeer GossipPropagationStrategy::visit() {
76 274 : std::lock_guard<std::mutex> lock(m);
77 274 : if (not peer_factory or (non_visited.empty() and not initQueue())) {
78 : // either PeerProvider doesn't gives peers / dtor have been called
79 26 : return {};
80 : }
81 : // or non_visited non-empty
82 248 : BOOST_ASSERT(not non_visited.empty());
83 248 : BOOST_ASSERT(non_visited.back() < last_data.size());
84 :
85 248 : auto el = last_data[non_visited.back()];
86 248 : non_visited.pop_back();
87 248 : return el;
88 274 : }
89 :
90 : } // namespace iroha
|