LCOV - code coverage report
Current view: top level - irohad/ordering/impl - on_demand_ordering_service_impl.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 93 95 97.9 %
Date: 2018-12-05 17:11:35 Functions: 17 17 100.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "ordering/impl/on_demand_ordering_service_impl.hpp"
       7             : 
       8             : #include <unordered_set>
       9             : 
      10             : #include <boost/range/adaptor/filtered.hpp>
      11             : #include <boost/range/adaptor/indirected.hpp>
      12             : #include <boost/range/adaptor/transformed.hpp>
      13             : #include <boost/range/algorithm/for_each.hpp>
      14             : #include <boost/range/size.hpp>
      15             : #include "ametsuchi/tx_presence_cache.hpp"
      16             : #include "ametsuchi/tx_presence_cache_utils.hpp"
      17             : #include "common/visitor.hpp"
      18             : #include "datetime/time.hpp"
      19             : #include "interfaces/iroha_internal/proposal.hpp"
      20             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      21             : #include "interfaces/transaction.hpp"
      22             : 
      23             : using namespace iroha::ordering;
      24             : 
      25             : /**
      26             :  * First round after successful committing block
      27             :  */
      28             : const iroha::consensus::RejectRoundType kFirstRound = 1;
      29             : 
      30             : OnDemandOrderingServiceImpl::OnDemandOrderingServiceImpl(
      31             :     size_t transaction_limit,
      32             :     std::unique_ptr<shared_model::interface::UnsafeProposalFactory>
      33             :         proposal_factory,
      34             :     std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
      35             :     size_t number_of_proposals,
      36             :     const consensus::Round &initial_round)
      37          10 :     : transaction_limit_(transaction_limit),
      38          10 :       number_of_proposals_(number_of_proposals),
      39          10 :       proposal_factory_(std::move(proposal_factory)),
      40          10 :       tx_cache_(std::move(tx_cache)),
      41          10 :       log_(logger::log("OnDemandOrderingServiceImpl")) {
      42          10 :   onCollaborationOutcome(initial_round);
      43          10 : }
      44             : 
      45             : // -------------------------| OnDemandOrderingService |-------------------------
      46             : 
      47             : void OnDemandOrderingServiceImpl::onCollaborationOutcome(
      48             :     consensus::Round round) {
      49          37 :   log_->info("onCollaborationOutcome => round[{}, {}]",
      50          37 :              round.block_round,
      51          37 :              round.reject_round);
      52             :   // exclusive write lock
      53          37 :   std::lock_guard<std::shared_timed_mutex> guard(lock_);
      54          37 :   log_->info("onCollaborationOutcome => write lock is acquired");
      55             : 
      56          37 :   packNextProposals(round);
      57          37 :   tryErase();
      58          37 : }
      59             : 
      60             : // ----------------------------| OdOsNotification |-----------------------------
      61             : 
      62             : void OnDemandOrderingServiceImpl::onBatches(consensus::Round round,
      63             :                                             CollectionType batches) {
      64             :   // read lock
      65          26 :   std::shared_lock<std::shared_timed_mutex> guard(lock_);
      66          26 :   log_->info("onBatches => collection size = {}, round[{}, {}]",
      67          26 :              batches.size(),
      68          26 :              round.block_round,
      69          26 :              round.reject_round);
      70             : 
      71             :   auto unprocessed_batches =
      72             :       boost::adaptors::filter(batches, [this](const auto &batch) {
      73          66 :         log_->info("check batch {} for already processed transactions",
      74          66 :                    batch->reducedHash().hex());
      75          66 :         return not this->batchAlreadyProcessed(*batch);
      76             :       });
      77          26 :   auto it = current_proposals_.find(round);
      78          26 :   if (it != current_proposals_.end()) {
      79          25 :     std::for_each(unprocessed_batches.begin(),
      80          25 :                   unprocessed_batches.end(),
      81             :                   [&it](auto &obj) { it->second.push(std::move(obj)); });
      82             :     log_->info("onTransactions => collection is inserted");
      83          25 :   }
      84          26 : }
      85             : 
      86             : boost::optional<OnDemandOrderingServiceImpl::ProposalType>
      87             : OnDemandOrderingServiceImpl::onRequestProposal(consensus::Round round) {
      88             :   // read lock
      89          29 :   std::shared_lock<std::shared_timed_mutex> guard(lock_);
      90          29 :   auto proposal = proposal_map_.find(round);
      91          29 :   if (proposal != proposal_map_.end()) {
      92             :     return clone(*proposal->second);
      93             :   } else {
      94          13 :     return boost::none;
      95             :   }
      96          29 : }
      97             : 
      98             : // ---------------------------------| Private |---------------------------------
      99             : 
     100             : void OnDemandOrderingServiceImpl::packNextProposals(
     101             :     const consensus::Round &round) {
     102             :   auto close_round = [this](consensus::Round round) {
     103          64 :     auto it = current_proposals_.find(round);
     104          64 :     if (it != current_proposals_.end()) {
     105          44 :       if (not it->second.empty()) {
     106          25 :         proposal_map_.emplace(round, emitProposal(round));
     107          25 :         log_->info("packNextProposal: data has been fetched for round[{}, {}]",
     108          25 :                    round.block_round,
     109          25 :                    round.reject_round);
     110          25 :         round_queue_.push(round);
     111          25 :       }
     112             :       current_proposals_.erase(it);
     113          44 :     }
     114          64 :   };
     115             : 
     116             :   /*
     117             :    * The possible cases can be visualised as a diagram, where:
     118             :    * o - current round, x - next round, v - target round
     119             :    *
     120             :    *   0 1 2
     121             :    * 0 o x v
     122             :    * 1 x v .
     123             :    * 2 v . .
     124             :    *
     125             :    * Reject case:
     126             :    *
     127             :    *   0 1 2 3
     128             :    * 0 . o x v
     129             :    * 1 x v . .
     130             :    * 2 v . . .
     131             :    *
     132             :    * (0,1) - current round. Round (0,2) is closed for transactions.
     133             :    * Round (0,3) is now receiving transactions.
     134             :    * Rounds (1,) and (2,) do not change.
     135             :    *
     136             :    * Commit case:
     137             :    *
     138             :    *   0 1 2
     139             :    * 0 . . .
     140             :    * 1 o x v
     141             :    * 2 x v .
     142             :    * 3 v . .
     143             :    *
     144             :    * (1,0) - current round. The diagram is similar to the initial case.
     145             :    */
     146             : 
     147             :   // close next reject round
     148          37 :   close_round({round.block_round, round.reject_round + 1});
     149             : 
     150          37 :   if (round.reject_round == kFirstRound) {
     151             :     // new block round
     152          27 :     close_round({round.block_round + 1, round.reject_round});
     153             : 
     154             :     // remove current queues
     155          27 :     current_proposals_.clear();
     156             :     // initialize the 3 diagonal rounds from the commit case diagram
     157         108 :     for (uint32_t i = 0; i <= 2; ++i) {
     158          81 :       current_proposals_[{round.block_round + i, round.reject_round + 2 - i}];
     159          81 :     }
     160          27 :   } else {
     161             :     // new reject round
     162          10 :     current_proposals_[{round.block_round, round.reject_round + 2}];
     163             :   }
     164          37 : }
     165             : 
     166             : OnDemandOrderingServiceImpl::ProposalType
     167             : OnDemandOrderingServiceImpl::emitProposal(const consensus::Round &round) {
     168          25 :   log_->info("Mutable proposal generation, round[{}, {}]",
     169          25 :              round.block_round,
     170          25 :              round.reject_round);
     171             : 
     172          25 :   TransactionBatchType batch;
     173          25 :   std::vector<std::shared_ptr<shared_model::interface::Transaction>> collection;
     174          25 :   std::unordered_set<std::string> inserted;
     175             : 
     176             :   // outer method should guarantee availability of at least one transaction in
     177             :   // queue, also, code shouldn't fetch all transactions from queue. The rest
     178             :   // will be lost.
     179          25 :   auto &current_proposal = current_proposals_[round];
     180          70 :   while (current_proposal.try_pop(batch)
     181          70 :          and collection.size() < transaction_limit_
     182          46 :          and inserted.insert(batch->reducedHash().hex()).second) {
     183          45 :     collection.insert(
     184          45 :         std::end(collection),
     185          45 :         std::make_move_iterator(std::begin(batch->transactions())),
     186          45 :         std::make_move_iterator(std::end(batch->transactions())));
     187             :   }
     188          25 :   log_->info("Number of transactions in proposal = {}", collection.size());
     189             : 
     190          25 :   auto txs = collection | boost::adaptors::indirected;
     191          25 :   return proposal_factory_->unsafeCreateProposal(
     192          25 :       round.block_round, iroha::time::now(), txs);
     193          25 : }
     194             : 
     195             : void OnDemandOrderingServiceImpl::tryErase() {
     196          37 :   if (round_queue_.size() >= number_of_proposals_) {
     197          12 :     auto &round = round_queue_.front();
     198          12 :     proposal_map_.erase(round);
     199          12 :     log_->info("tryErase: erased round[{}, {}]",
     200          12 :                round.block_round,
     201          12 :                round.reject_round);
     202          12 :     round_queue_.pop();
     203          12 :   }
     204          37 : }
     205             : 
     206             : bool OnDemandOrderingServiceImpl::batchAlreadyProcessed(
     207             :     const shared_model::interface::TransactionBatch &batch) {
     208          66 :   auto tx_statuses = tx_cache_->check(batch);
     209          66 :   if (not tx_statuses) {
     210             :     // TODO andrei 30.11.18 IR-51 Handle database error
     211           0 :     log_->warn("Check tx presence database error. Batch: {}", batch.toString());
     212           0 :     return true;
     213             :   }
     214             :   // if any transaction is commited or rejected, batch was already processed
     215             :   // Note: any_of returns false for empty sequence
     216          66 :   return std::any_of(
     217             :       tx_statuses->begin(), tx_statuses->end(), [this](const auto &tx_status) {
     218          66 :         if (iroha::ametsuchi::isAlreadyProcessed(tx_status)) {
     219           2 :           log_->warn("Duplicate transaction: {}",
     220           2 :                      iroha::ametsuchi::getHash(tx_status).hex());
     221           2 :           return true;
     222             :         }
     223          64 :         return false;
     224          66 :       });
     225          66 : }

Generated by: LCOV version 1.13