LCOV - code coverage report
Current view: top level - irohad/torii/impl - command_service_impl.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 62 72 86.1 %
Date: 2018-12-05 17:11:35 Functions: 33 51 64.7 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "torii/impl/command_service_impl.hpp"
       7             : 
       8             : #include <thread>
       9             : 
      10             : #include "ametsuchi/block_query.hpp"
      11             : #include "common/byteutils.hpp"
      12             : #include "common/is_any.hpp"
      13             : #include "common/visitor.hpp"
      14             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      15             : 
      16             : namespace torii {
      17             : 
      18             :   CommandServiceImpl::CommandServiceImpl(
      19             :       std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
      20             :       std::shared_ptr<iroha::ametsuchi::Storage> storage,
      21             :       std::shared_ptr<iroha::torii::StatusBus> status_bus,
      22             :       std::shared_ptr<shared_model::interface::TxStatusFactory> status_factory)
      23         261 :       : tx_processor_(std::move(tx_processor)),
      24         261 :         storage_(std::move(storage)),
      25         261 :         status_bus_(std::move(status_bus)),
      26         261 :         cache_(std::make_shared<CacheType>()),
      27         261 :         status_factory_(std::move(status_factory)),
      28         261 :         log_(logger::log("CommandServiceImpl")) {
      29             :     // Notifier for all clients
      30             :     status_bus_->statuses().subscribe([this](auto response) {
      31             :       // find response for this tx in cache; if status of received response
      32             :       // isn't "greater" than cached one, dismiss received one
      33        2944 :       auto tx_hash = response->transactionHash();
      34        2944 :       auto cached_tx_state = cache_->findItem(tx_hash);
      35        2944 :       if (cached_tx_state
      36        2944 :           and response->comparePriorities(**cached_tx_state)
      37        2145 :               != shared_model::interface::TransactionResponse::
      38             :                      PrioritiesComparisonResult::kGreater) {
      39         747 :         return;
      40             :       }
      41        2197 :       cache_->addItem(tx_hash, response);
      42        2944 :     });
      43         261 :   }
      44             : 
      45             :   void CommandServiceImpl::handleTransactionBatch(
      46             :       std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
      47         739 :     processBatch(batch);
      48         739 :   }
      49             : 
      50             :   std::shared_ptr<shared_model::interface::TransactionResponse>
      51             :   CommandServiceImpl::getStatus(const shared_model::crypto::Hash &request) {
      52         813 :     auto cached = cache_->findItem(request);
      53         813 :     if (cached) {
      54         797 :       return cached.value();
      55             :     }
      56             : 
      57          16 :     auto block_query = storage_->getBlockQuery();
      58          16 :     if (not block_query) {
      59             :       // TODO andrei 30.11.18 IR-51 Handle database error
      60           0 :       log_->warn("Could not create block query. Tx: {}", request.hex());
      61           0 :       return status_factory_->makeNotReceived(request);
      62             :     }
      63             : 
      64          16 :     auto status = block_query->checkTxPresence(request);
      65          16 :     if (not status) {
      66             :       // TODO andrei 30.11.18 IR-51 Handle database error
      67          11 :       log_->warn("Check tx presence database error. Tx: {}", request.hex());
      68          11 :       return status_factory_->makeNotReceived(request);
      69             :     }
      70             : 
      71           5 :     return iroha::visit_in_place(
      72           5 :         *status,
      73             :         [this,
      74           5 :          &request](const iroha::ametsuchi::tx_cache_status_responses::Missing &)
      75             :             -> std::shared_ptr<shared_model::interface::TransactionResponse> {
      76           5 :           log_->warn("Asked non-existing tx: {}", request.hex());
      77           5 :           return status_factory_->makeNotReceived(request);
      78           0 :         },
      79             :         [this, &request](const auto &) {
      80             :           std::shared_ptr<shared_model::interface::TransactionResponse>
      81           0 :               response = status_factory_->makeCommitted(request);
      82             :           cache_->addItem(request, response);
      83           0 :           return response;
      84           0 :         });
      85         813 :   }
      86             : 
      87             :   /**
      88             :    * Statuses considered final for streaming. Observable stops value emission
      89             :    * after receiving a value of one of the following types
      90             :    * @tparam T concrete response type
      91             :    */
      92             :   template <typename T>
      93             :   constexpr bool FinalStatusValue =
      94             :       iroha::is_any<std::decay_t<T>,
      95             :                     shared_model::interface::StatelessFailedTxResponse,
      96             :                     shared_model::interface::StatefulFailedTxResponse,
      97             :                     shared_model::interface::CommittedTxResponse,
      98             :                     shared_model::interface::MstExpiredResponse>::value;
      99             : 
     100             :   rxcpp::observable<
     101             :       std::shared_ptr<shared_model::interface::TransactionResponse>>
     102             :   CommandServiceImpl::getStatusStream(const shared_model::crypto::Hash &hash) {
     103             :     using ResponsePtrType =
     104             :         std::shared_ptr<shared_model::interface::TransactionResponse>;
     105             :     auto initial_status = cache_->findItem(hash).value_or([&] {
     106           2 :       log_->debug("tx is not received: {}", hash.toString());
     107           2 :       return status_factory_->makeNotReceived(hash);
     108           0 :     }());
     109           2 :     return status_bus_
     110           2 :         ->statuses()
     111             :         // prepend initial status
     112             :         .start_with(initial_status)
     113             :         // select statuses with requested hash
     114           2 :         .filter(
     115             :             [&](auto response) { return response->transactionHash() == hash; })
     116             :         // successfully complete the observable if final status is received.
     117             :         // final status is included in the observable
     118             :         .template lift<ResponsePtrType>([](rxcpp::subscriber<ResponsePtrType>
     119             :                                                dest) {
     120           2 :           return rxcpp::make_subscriber<ResponsePtrType>(
     121             :               dest, [=](ResponsePtrType response) {
     122           3 :                 dest.on_next(response);
     123           3 :                 iroha::visit_in_place(
     124           3 :                     response->get(),
     125             :                     [dest](const auto &resp)
     126             :                         -> std::enable_if_t<FinalStatusValue<decltype(resp)>> {
     127           1 :                       dest.on_completed();
     128           1 :                     },
     129             :                     [](const auto &resp)
     130             :                         -> std::enable_if_t<
     131           2 :                             not FinalStatusValue<decltype(resp)>>{});
     132           3 :               });
     133           0 :         });
     134           2 :   }
     135             : 
     136             :   void CommandServiceImpl::pushStatus(
     137             :       const std::string &who,
     138             :       std::shared_ptr<shared_model::interface::TransactionResponse> response) {
     139         744 :     log_->debug("{}: adding item to cache: {}", who, response->toString());
     140         744 :     status_bus_->publish(response);
     141         744 :   }
     142             : 
     143             :   void CommandServiceImpl::processBatch(
     144             :       std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
     145         739 :     tx_processor_->batchHandle(batch);
     146         739 :     const auto &txs = batch->transactions();
     147             :     std::for_each(txs.begin(), txs.end(), [this](const auto &tx) {
     148         744 :       const auto &tx_hash = tx->hash();
     149         744 :       auto found = cache_->findItem(tx_hash);
     150             :       // StatlessValid status goes only after EnoughSignaturesCollectedResponse
     151             :       // So doesn't skip publishing status after it
     152         744 :       if (found
     153         744 :           and iroha::visit_in_place(
     154         701 :                   found.value()->get(),
     155             :                   [](const shared_model::interface::
     156         697 :                          EnoughSignaturesCollectedResponse &) { return false; },
     157             :                   [](auto &) { return true; })
     158         701 :           and tx->quorum() < 2) {
     159           0 :         log_->warn("Found transaction {} in cache, ignoring", tx_hash.hex());
     160           0 :         return;
     161             :       }
     162             : 
     163         744 :       this->pushStatus("ToriiBatchProcessor",
     164         744 :                        status_factory_->makeStatelessValid(tx_hash));
     165         744 :     });
     166         739 :   }
     167             : 
     168             : }  // namespace torii

Generated by: LCOV version 1.13