LCOV - code coverage report
Current view: top level - irohad/torii/processor/impl - transaction_processor_impl.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 97 106 91.5 %
Date: 2018-12-05 17:11:35 Functions: 18 21 85.7 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "torii/processor/transaction_processor_impl.hpp"
       7             : 
       8             : #include <boost/format.hpp>
       9             : 
      10             : #include "interfaces/iroha_internal/block.hpp"
      11             : #include "interfaces/iroha_internal/proposal.hpp"
      12             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      13             : #include "interfaces/iroha_internal/transaction_sequence.hpp"
      14             : #include "validation/stateful_validator_common.hpp"
      15             : 
      16             : namespace iroha {
      17             :   namespace torii {
      18             : 
      19             :     using network::PeerCommunicationService;
      20             : 
      21             :     namespace {
      22             :       std::string composeErrorMessage(
      23             :           const validation::TransactionError &tx_hash_and_error) {
      24          60 :         const auto tx_hash = tx_hash_and_error.first.hex();
      25          60 :         const auto &cmd_error = tx_hash_and_error.second;
      26          60 :         if (not cmd_error.tx_passed_initial_validation) {
      27           6 :           return (boost::format(
      28             :                       "Stateful validation error: transaction %s "
      29             :                       "did not pass initial verification: "
      30             :                       "checking '%s', error code '%d', query arguments: %s")
      31           6 :                   % tx_hash % cmd_error.name % cmd_error.error_code
      32           6 :                   % cmd_error.error_extra)
      33           6 :               .str();
      34             :         }
      35          54 :         return (boost::format(
      36             :                     "Stateful validation error in transaction %s: "
      37             :                     "command '%s' with index '%d' did not pass "
      38             :                     "verification with code '%d', query arguments: %s")
      39          54 :                 % tx_hash % cmd_error.name % cmd_error.index
      40          54 :                 % cmd_error.error_code % cmd_error.error_extra)
      41          54 :             .str();
      42          60 :       }
      43             :     }  // namespace
      44             : 
      45             :     TransactionProcessorImpl::TransactionProcessorImpl(
      46             :         std::shared_ptr<PeerCommunicationService> pcs,
      47             :         std::shared_ptr<MstProcessor> mst_processor,
      48             :         std::shared_ptr<iroha::torii::StatusBus> status_bus,
      49             :         std::shared_ptr<shared_model::interface::TxStatusFactory>
      50             :             status_factory)
      51         269 :         : pcs_(std::move(pcs)),
      52         269 :           mst_processor_(std::move(mst_processor)),
      53         269 :           status_bus_(std::move(status_bus)),
      54         269 :           status_factory_(std::move(status_factory)),
      55         269 :           log_(logger::log("TxProcessor")) {
      56             :       // process stateful validation results
      57         269 :       pcs_->on_verified_proposal().subscribe(
      58             :           [this](std::shared_ptr<validation::VerifiedProposalAndErrors>
      59             :                      proposal_and_errors) {
      60             :             // notify about failed txs
      61         714 :             const auto &errors = proposal_and_errors->rejected_transactions;
      62         714 :             std::lock_guard<std::mutex> lock(notifier_mutex_);
      63         774 :             for (const auto &tx_error : errors) {
      64          60 :               log_->info(composeErrorMessage(tx_error));
      65          60 :               this->publishStatus(TxStatusType::kStatefulFailed,
      66          60 :                                   tx_error.first,
      67          60 :                                   tx_error.second);
      68             :             }
      69             :             // notify about success txs
      70        1396 :             for (const auto &successful_tx :
      71         714 :                  proposal_and_errors->verified_proposal->transactions()) {
      72         682 :               log_->info("on stateful validation success: {}",
      73         682 :                          successful_tx.hash().hex());
      74         682 :               this->publishStatus(TxStatusType::kStatefulValid,
      75         682 :                                   successful_tx.hash());
      76             :             }
      77         714 :           });
      78             : 
      79             :       // commit transactions
      80         269 :       pcs_->on_commit().subscribe(
      81             :           [this](synchronizer::SynchronizationEvent sync_event) {
      82             :             sync_event.synced_blocks.subscribe(
      83             :                 // on next
      84             :                 [this](auto model_block) {
      85         709 :                   current_txs_hashes_.reserve(
      86         709 :                       model_block->transactions().size());
      87         709 :                   std::transform(model_block->transactions().begin(),
      88         709 :                                  model_block->transactions().end(),
      89         709 :                                  std::back_inserter(current_txs_hashes_),
      90             :                                  [](const auto &tx) { return tx.hash(); });
      91         709 :                 },
      92             :                 // on complete
      93             :                 [this] {
      94         708 :                   if (current_txs_hashes_.empty()) {
      95          51 :                     log_->info("there are no transactions to be committed");
      96          51 :                   } else {
      97         657 :                     std::lock_guard<std::mutex> lock(notifier_mutex_);
      98        1334 :                     for (const auto &tx_hash : current_txs_hashes_) {
      99         677 :                       log_->info("on commit committed: {}", tx_hash.hex());
     100         677 :                       this->publishStatus(TxStatusType::kCommitted, tx_hash);
     101             :                     }
     102             :                     current_txs_hashes_.clear();
     103         657 :                   }
     104         708 :                 });
     105         709 :           });
     106             : 
     107             :       mst_processor_->onStateUpdate().subscribe([this](auto &&state) {
     108           8 :         log_->info("MST state updated");
     109          16 :         for (auto &&batch : state->getBatches()) {
     110          16 :           for (auto &&tx : batch->transactions()) {
     111           8 :             this->publishStatus(TxStatusType::kMstPending, tx->hash());
     112             :           }
     113             :         }
     114           8 :       });
     115             :       mst_processor_->onPreparedBatches().subscribe([this](auto &&batch) {
     116           3 :         log_->info("MST batch prepared");
     117           3 :         this->publishEnoughSignaturesStatus(batch->transactions());
     118           3 :         this->pcs_->propagate_batch(batch);
     119           3 :       });
     120             :       mst_processor_->onExpiredBatches().subscribe([this](auto &&batch) {
     121           1 :         log_->info("MST batch {} is expired", batch->reducedHash().toString());
     122           2 :         for (auto &&tx : batch->transactions()) {
     123           1 :           this->publishStatus(TxStatusType::kMstExpired, tx->hash());
     124             :         }
     125           1 :       });
     126         269 :     }
     127             : 
     128             :     void TransactionProcessorImpl::batchHandle(
     129             :         std::shared_ptr<shared_model::interface::TransactionBatch>
     130             :             transaction_batch) const {
     131         757 :       log_->info("handle batch");
     132         757 :       if (transaction_batch->hasAllSignatures()) {
     133         745 :         log_->info("propagating batch to PCS");
     134         745 :         this->publishEnoughSignaturesStatus(transaction_batch->transactions());
     135         745 :         pcs_->propagate_batch(transaction_batch);
     136         745 :       } else {
     137          12 :         log_->info("propagating batch to MST");
     138          12 :         mst_processor_->propagateBatch(transaction_batch);
     139             :       }
     140         757 :     }
     141             : 
     142             :     void TransactionProcessorImpl::publishStatus(
     143             :         TxStatusType tx_status,
     144             :         const shared_model::crypto::Hash &hash,
     145             :         const validation::CommandError &cmd_error) const {
     146        2185 :       auto tx_error = cmd_error.name.empty()
     147        2125 :           ? shared_model::interface::TxStatusFactory::TransactionError{}
     148          60 :           : shared_model::interface::TxStatusFactory::TransactionError{
     149          60 :                 cmd_error.name, cmd_error.index, cmd_error.error_code};
     150        2185 :       switch (tx_status) {
     151             :         case TxStatusType::kStatelessFailed: {
     152           0 :           status_bus_->publish(
     153           0 :               status_factory_->makeStatelessFail(hash, tx_error));
     154           0 :           return;
     155             :         };
     156             :         case TxStatusType::kStatelessValid: {
     157           0 :           status_bus_->publish(
     158           0 :               status_factory_->makeStatelessValid(hash, tx_error));
     159           0 :           return;
     160             :         };
     161             :         case TxStatusType::kStatefulFailed: {
     162          60 :           status_bus_->publish(
     163          60 :               status_factory_->makeStatefulFail(hash, tx_error));
     164          60 :           return;
     165             :         };
     166             :         case TxStatusType::kStatefulValid: {
     167         682 :           status_bus_->publish(
     168         682 :               status_factory_->makeStatefulValid(hash, tx_error));
     169         682 :           return;
     170             :         };
     171             :         case TxStatusType::kCommitted: {
     172         677 :           status_bus_->publish(status_factory_->makeCommitted(hash, tx_error));
     173         677 :           return;
     174             :         };
     175             :         case TxStatusType::kMstExpired: {
     176           1 :           status_bus_->publish(status_factory_->makeMstExpired(hash, tx_error));
     177           1 :           return;
     178             :         };
     179             :         case TxStatusType::kNotReceived: {
     180           0 :           status_bus_->publish(
     181           0 :               status_factory_->makeNotReceived(hash, tx_error));
     182           0 :           return;
     183             :         };
     184             :         case TxStatusType::kMstPending: {
     185           8 :           status_bus_->publish(status_factory_->makeMstPending(hash, tx_error));
     186           8 :           return;
     187             :         };
     188             :         case TxStatusType::kEnoughSignaturesCollected: {
     189         757 :           status_bus_->publish(
     190         757 :               status_factory_->makeEnoughSignaturesCollected(hash, tx_error));
     191         757 :           return;
     192             :         };
     193             :       }
     194        2185 :     }
     195             : 
     196             :     void TransactionProcessorImpl::publishEnoughSignaturesStatus(
     197             :         const shared_model::interface::types::SharedTxsCollectionType &txs)
     198             :         const {
     199        1505 :       for (const auto &tx : txs) {
     200         757 :         this->publishStatus(TxStatusType::kEnoughSignaturesCollected,
     201         757 :                             tx->hash());
     202             :       }
     203         748 :     }
     204             :   }  // namespace torii
     205             : }  // namespace iroha

Generated by: LCOV version 1.13