LCOV - code coverage report
Current view: top level - irohad/multi_sig_transactions/impl - gossip_propagation_strategy.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 37 39 94.9 %
Date: 2018-12-05 17:11:35 Functions: 13 14 92.9 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "multi_sig_transactions/gossip_propagation_strategy.hpp"
       7             : 
       8             : #include <numeric>
       9             : #include <random>
      10             : 
      11             : #include <boost/assert.hpp>
      12             : #include <boost/range/irange.hpp>
      13             : #include "common/bind.hpp"
      14             : 
      15             : namespace iroha {
      16             : 
      17             :   using PropagationData = PropagationStrategy::PropagationData;
      18             :   using OptPeer = GossipPropagationStrategy::OptPeer;
      19             :   using PeerProviderFactory = GossipPropagationStrategy::PeerProviderFactory;
      20             :   using std::chrono::steady_clock;
      21             : 
      22             :   GossipPropagationStrategy::GossipPropagationStrategy(
      23             :       PeerProviderFactory peer_factory,
      24             :       rxcpp::observe_on_one_worker emit_worker,
      25             :       const GossipPropagationStrategyParams &params)
      26          11 :       : peer_factory(peer_factory),
      27          11 :         non_visited({}),
      28          11 :         emit_worker(emit_worker),
      29          11 :         emitent(rxcpp::observable<>::interval(steady_clock::now(),
      30          11 :                                               params.emission_period)
      31             :                     .map([this, params](int) {
      32         139 :                       PropagationData vec;
      33         145 :                       auto range = boost::irange(0u, params.amount_per_once);
      34             :                       // push until find empty element
      35         149 :                       std::find_if_not(
      36             :                           range.begin(), range.end(), [this, &vec](int) {
      37             :                             return this->visit() | [&vec](auto e) -> bool {
      38         248 :                               vec.push_back(e);
      39         248 :                               return true;  // proceed
      40             :                             };
      41           0 :                           });
      42         149 :                       return vec;
      43         149 :                     })
      44          11 :                     .subscribe_on(emit_worker)) {}
      45             : 
      46             :   rxcpp::observable<PropagationData> GossipPropagationStrategy::emitter() {
      47          20 :     return emitent;
      48             :   }
      49             : 
      50             :   GossipPropagationStrategy::~GossipPropagationStrategy() {
      51             :     // Make sure that emitent callback have finish and haven't started yet
      52          11 :     std::lock_guard<std::mutex> lock(m);
      53          11 :     peer_factory.reset();
      54          11 :   }
      55             : 
      56             :   bool GossipPropagationStrategy::initQueue() {
      57             :     return peer_factory->createPeerQuery() | [](const auto &query) {
      58          61 :       return query->getLedgerPeers();
      59             :     } | [](auto &&data) -> boost::optional<PropagationData> {
      60          48 :       if (data.size() == 0) {
      61          13 :         return {};
      62             :       }
      63          35 :       return std::move(data);
      64             :     } | [this](auto &&data) -> bool {  // nullopt implicitly casts to false
      65          35 :       this->last_data = std::move(data);
      66          35 :       this->non_visited.resize(this->last_data.size());
      67          35 :       std::iota(this->non_visited.begin(), this->non_visited.end(), 0);
      68          35 :       std::shuffle(this->non_visited.begin(),
      69          35 :                    this->non_visited.end(),
      70          35 :                    std::default_random_engine{});
      71          35 :       return true;
      72             :     };
      73           0 :   }
      74             : 
      75             :   OptPeer GossipPropagationStrategy::visit() {
      76         274 :     std::lock_guard<std::mutex> lock(m);
      77         274 :     if (not peer_factory or (non_visited.empty() and not initQueue())) {
      78             :       // either PeerProvider doesn't gives peers / dtor have been called
      79          26 :       return {};
      80             :     }
      81             :     // or non_visited non-empty
      82         248 :     BOOST_ASSERT(not non_visited.empty());
      83         248 :     BOOST_ASSERT(non_visited.back() < last_data.size());
      84             : 
      85         248 :     auto el = last_data[non_visited.back()];
      86         248 :     non_visited.pop_back();
      87         248 :     return el;
      88         274 :   }
      89             : 
      90             : }  // namespace iroha

Generated by: LCOV version 1.13