LCOV - code coverage report
Current view: top level - irohad/ordering/impl - on_demand_ordering_gate.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 47 52 90.4 %
Date: 2018-12-05 17:11:35 Functions: 11 14 78.6 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "ordering/impl/on_demand_ordering_gate.hpp"
       7             : 
       8             : #include <boost/range/adaptor/filtered.hpp>
       9             : #include <boost/range/empty.hpp>
      10             : #include "ametsuchi/tx_presence_cache.hpp"
      11             : #include "common/visitor.hpp"
      12             : 
      13             : using namespace iroha;
      14             : using namespace iroha::ordering;
      15             : 
      16             : OnDemandOrderingGate::OnDemandOrderingGate(
      17             :     std::shared_ptr<OnDemandOrderingService> ordering_service,
      18             :     std::shared_ptr<transport::OdOsNotification> network_client,
      19             :     rxcpp::observable<BlockRoundEventType> events,
      20             :     std::shared_ptr<cache::OrderingGateCache> cache,
      21             :     std::unique_ptr<shared_model::interface::UnsafeProposalFactory> factory,
      22             :     std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
      23             :     consensus::Round initial_round)
      24           8 :     : ordering_service_(std::move(ordering_service)),
      25           8 :       network_client_(std::move(network_client)),
      26             :       events_subscription_(events.subscribe([this](auto event) {
      27             :         // exclusive lock
      28           7 :         std::lock_guard<std::shared_timed_mutex> lock(mutex_);
      29             : 
      30           7 :         visit_in_place(event,
      31             :                        [this](const BlockEvent &block_event) {
      32             :                          // block committed, increment block round
      33           5 :                          current_round_ = block_event.round;
      34           5 :                          cache_->remove(block_event.hashes);
      35           5 :                        },
      36             :                        [this](const EmptyEvent &empty) {
      37             :                          // no blocks committed, increment reject round
      38           2 :                          current_round_ = {current_round_.block_round,
      39           2 :                                            current_round_.reject_round + 1};
      40           2 :                        });
      41             : 
      42           7 :         auto batches = cache_->pop();
      43             : 
      44           7 :         cache_->addToBack(batches);
      45           7 :         network_client_->onBatches(current_round_,
      46           7 :                                    transport::OdOsNotification::CollectionType{
      47           7 :                                        batches.begin(), batches.end()});
      48             : 
      49             :         // notify our ordering service about new round
      50           7 :         ordering_service_->onCollaborationOutcome(current_round_);
      51             : 
      52             :         // request proposal for the current round
      53           7 :         auto proposal = network_client_->onRequestProposal(current_round_);
      54             : 
      55           7 :         auto final_proposal = this->processProposalRequest(std::move(proposal));
      56             :         // vote for the object received from the network
      57           7 :         proposal_notifier_.get_subscriber().on_next(std::move(final_proposal));
      58           7 :       })),
      59           8 :       cache_(std::move(cache)),
      60           8 :       proposal_factory_(std::move(factory)),
      61           8 :       tx_cache_(std::move(tx_cache)),
      62           8 :       current_round_(initial_round) {}
      63             : 
      64             : void OnDemandOrderingGate::propagateBatch(
      65             :     std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
      66           1 :   std::shared_lock<std::shared_timed_mutex> lock(mutex_);
      67             : 
      68           1 :   cache_->addToBack({batch});
      69           1 :   network_client_->onBatches(
      70           1 :       current_round_, transport::OdOsNotification::CollectionType{batch});
      71           1 : }
      72             : 
      73             : rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
      74             : OnDemandOrderingGate::on_proposal() {
      75           5 :   return proposal_notifier_.get_observable();
      76           0 : }
      77             : 
      78             : void OnDemandOrderingGate::setPcs(
      79             :     const iroha::network::PeerCommunicationService &pcs) {
      80           0 :   throw std::logic_error(
      81             :       "Method is deprecated. PCS observable should be set in ctor");
      82           0 : }
      83             : 
      84             : std::unique_ptr<shared_model::interface::Proposal>
      85             : OnDemandOrderingGate::processProposalRequest(
      86             :     boost::optional<OnDemandOrderingService::ProposalType> &&proposal) const {
      87           7 :   if (not proposal) {
      88           4 :     return proposal_factory_->unsafeCreateProposal(
      89           4 :         current_round_.block_round, current_round_.reject_round, {});
      90             :   }
      91             :   // no need to check empty proposal
      92           3 :   if (boost::empty(proposal.value()->transactions())) {
      93           2 :     return std::move(proposal.value());
      94             :   }
      95           1 :   return removeReplays(std::move(**std::move(proposal)));
      96           7 : }
      97             : 
      98             : std::unique_ptr<shared_model::interface::Proposal>
      99             : OnDemandOrderingGate::removeReplays(
     100             :     shared_model::interface::Proposal &&proposal) const {
     101             :   auto tx_is_not_processed = [this](const auto &tx) {
     102           1 :     auto tx_result = tx_cache_->check(tx.hash());
     103           1 :     if (not tx_result) {
     104             :       // TODO andrei 30.11.18 IR-51 Handle database error
     105           0 :       return false;
     106             :     }
     107           1 :     return iroha::visit_in_place(
     108           1 :         *tx_result,
     109             :         [](const ametsuchi::tx_cache_status_responses::Missing &) {
     110           0 :           return true;
     111             :         },
     112             :         [](const auto &status) {
     113             :           // TODO nickaleks 21.11.18: IR-1887 log replayed transactions
     114             :           // when log is added
     115           1 :           return false;
     116             :         });
     117           1 :   };
     118             :   auto unprocessed_txs =
     119           1 :       boost::adaptors::filter(proposal.transactions(), tx_is_not_processed);
     120             : 
     121           1 :   return proposal_factory_->unsafeCreateProposal(
     122           1 :       proposal.height(), proposal.createdTime(), unprocessed_txs);
     123           1 : }

Generated by: LCOV version 1.13