LCOV - code coverage report
Current view: top level - irohad/torii/impl - command_service_transport_grpc.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 91 104 87.5 %
Date: 2018-12-05 17:11:35 Functions: 34 35 97.1 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "common/default_constructible_unary_fn.hpp"  // non-copyable value workaround
       7             : 
       8             : #include "torii/impl/command_service_transport_grpc.hpp"
       9             : 
      10             : #include <iterator>
      11             : 
      12             : #include <boost/format.hpp>
      13             : #include <boost/range/adaptor/filtered.hpp>
      14             : #include <boost/range/adaptor/transformed.hpp>
      15             : #include "backend/protobuf/transaction_responses/proto_tx_response.hpp"
      16             : #include "common/timeout.hpp"
      17             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      18             : #include "interfaces/transaction.hpp"
      19             : 
      20             : namespace torii {
      21             : 
      22             :   CommandServiceTransportGrpc::CommandServiceTransportGrpc(
      23             :       std::shared_ptr<CommandService> command_service,
      24             :       std::shared_ptr<iroha::torii::StatusBus> status_bus,
      25             :       std::chrono::milliseconds initial_timeout,
      26             :       std::chrono::milliseconds nonfinal_timeout,
      27             :       std::shared_ptr<shared_model::interface::TxStatusFactory> status_factory,
      28             :       std::shared_ptr<TransportFactoryType> transaction_factory,
      29             :       std::shared_ptr<shared_model::interface::TransactionBatchParser>
      30             :           batch_parser,
      31             :       std::shared_ptr<shared_model::interface::TransactionBatchFactory>
      32             :           transaction_batch_factory)
      33         261 :       : command_service_(std::move(command_service)),
      34         261 :         status_bus_(std::move(status_bus)),
      35         261 :         initial_timeout_(initial_timeout),
      36         261 :         nonfinal_timeout_(nonfinal_timeout),
      37         261 :         status_factory_(std::move(status_factory)),
      38         261 :         transaction_factory_(std::move(transaction_factory)),
      39         261 :         batch_parser_(std::move(batch_parser)),
      40         261 :         batch_factory_(std::move(transaction_batch_factory)),
      41         261 :         log_(logger::log("CommandServiceTransportGrpc")) {}
      42             : 
      43             :   grpc::Status CommandServiceTransportGrpc::Torii(
      44             :       grpc::ServerContext *context,
      45             :       const iroha::protocol::Transaction *request,
      46             :       google::protobuf::Empty *response) {
      47         775 :     iroha::protocol::TxList single_tx_list;
      48         775 :     *single_tx_list.add_transactions() = *request;
      49         775 :     return ListTorii(context, &single_tx_list, response);
      50         775 :   }
      51             : 
      52             :   namespace {
      53             :     /**
      54             :      * Form an error message, which is to be shared between all transactions, if
      55             :      * there are several of them, or individual message, if there's only one
      56             :      * @param tx_hashes is non empty hash list to form error message from
      57             :      * @param error of those tx(s)
      58             :      * @return message
      59             :      */
      60             :     std::string formErrorMessage(
      61             :         const std::vector<shared_model::crypto::Hash> &tx_hashes,
      62             :         const std::string &error) {
      63           1 :       if (tx_hashes.size() == 1) {
      64           1 :         return (boost::format("Stateless invalid tx, error: %s, hash: %s")
      65           1 :                 % error % tx_hashes[0].hex())
      66           1 :             .str();
      67             :       }
      68             : 
      69             :       std::string folded_hashes =
      70           0 :           std::accumulate(std::next(tx_hashes.begin()),
      71           0 :                           tx_hashes.end(),
      72           0 :                           tx_hashes[0].hex(),
      73             :                           [](auto &&acc, const auto &h) -> std::string {
      74           0 :                             return acc + ", " + h.hex();
      75           0 :                           });
      76             : 
      77           0 :       return (boost::format(
      78             :                   "Stateless invalid tx in transaction sequence, error: %s\n"
      79             :                   "Hash list: [%s]")
      80           0 :               % error % folded_hashes)
      81           0 :           .str();
      82             :     }
      83             :   }  // namespace
      84             : 
      85             :   shared_model::interface::types::SharedTxsCollectionType
      86             :   CommandServiceTransportGrpc::deserializeTransactions(
      87             :       const iroha::protocol::TxList *request) {
      88         783 :     return boost::copy_range<
      89             :         shared_model::interface::types::SharedTxsCollectionType>(
      90         783 :         request->transactions()
      91         783 :         | boost::adaptors::transformed(
      92             :               [&](const auto &tx) { return transaction_factory_->build(tx); })
      93             :         | boost::adaptors::filtered([&](const auto &result) {
      94         804 :             return result.match(
      95             :                 [](const iroha::expected::Value<
      96             :                     std::unique_ptr<shared_model::interface::Transaction>> &) {
      97         745 :                   return true;
      98             :                 },
      99             :                 [&](const iroha::expected::Error<TransportFactoryType::Error>
     100             :                         &error) {
     101          59 :                   status_bus_->publish(status_factory_->makeStatelessFail(
     102             :                       error.error.hash,
     103          59 :                       shared_model::interface::TxStatusFactory::
     104          59 :                           TransactionError{error.error.error, 0, 0}));
     105          59 :                   return false;
     106           0 :                 });
     107             :           })
     108             :         | boost::adaptors::transformed([&](auto result) {
     109         745 :             return std::move(
     110         745 :                        boost::get<iroha::expected::ValueOf<decltype(result)>>(
     111         745 :                            result))
     112             :                 .value;
     113             :           }));
     114           0 :   }
     115             : 
     116             :   grpc::Status CommandServiceTransportGrpc::ListTorii(
     117             :       grpc::ServerContext *context,
     118             :       const iroha::protocol::TxList *request,
     119             :       google::protobuf::Empty *response) {
     120         783 :     auto transactions = deserializeTransactions(request);
     121             : 
     122         783 :     auto batches = batch_parser_->parseBatches(transactions);
     123             : 
     124        1523 :     for (auto &batch : batches) {
     125         740 :       batch_factory_->createTransactionBatch(batch).match(
     126             :           [&](iroha::expected::Value<std::unique_ptr<
     127             :                   shared_model::interface::TransactionBatch>> &value) {
     128         739 :             this->command_service_->handleTransactionBatch(
     129         739 :                 std::move(value).value);
     130         739 :           },
     131             :           [&](iroha::expected::Error<std::string> &error) {
     132           1 :             std::vector<shared_model::crypto::Hash> hashes;
     133             : 
     134         740 :             std::transform(batch.begin(),
     135           1 :                            batch.end(),
     136           1 :                            std::back_inserter(hashes),
     137             :                            [](const auto &tx) { return tx->hash(); });
     138             : 
     139           1 :             auto error_msg = formErrorMessage(hashes, error.error);
     140             :             // set error response for each transaction in a batch candidate
     141           1 :             std::for_each(
     142             :                 hashes.begin(), hashes.end(), [this, &error_msg](auto &hash) {
     143           1 :                   status_bus_->publish(status_factory_->makeStatelessFail(
     144           1 :                       hash,
     145           1 :                       shared_model::interface::TxStatusFactory::
     146           1 :                           TransactionError{error_msg, 0, 0}));
     147           1 :                 });
     148           1 :           });
     149             :     }
     150             : 
     151         783 :     return grpc::Status::OK;
     152         783 :   }
     153             : 
     154             :   grpc::Status CommandServiceTransportGrpc::Status(
     155             :       grpc::ServerContext *context,
     156             :       const iroha::protocol::TxStatusRequest *request,
     157             :       iroha::protocol::ToriiResponse *response) {
     158         813 :     *response =
     159         813 :         std::static_pointer_cast<shared_model::proto::TransactionResponse>(
     160         813 :             command_service_->getStatus(
     161         813 :                 shared_model::crypto::Hash(request->tx_hash())))
     162         813 :             ->getTransport();
     163         813 :     return grpc::Status::OK;
     164           0 :   }
     165             : 
     166             :   namespace {
     167             :     void handleEvents(rxcpp::composite_subscription &subscription,
     168             :                       rxcpp::schedulers::run_loop &run_loop) {
     169     5660056 :       while (subscription.is_subscribed() or not run_loop.empty()) {
     170     5660054 :         run_loop.dispatch();
     171             :       }
     172           2 :     }
     173             :   }  // namespace
     174             : 
     175             :   grpc::Status CommandServiceTransportGrpc::StatusStream(
     176             :       grpc::ServerContext *context,
     177             :       const iroha::protocol::TxStatusRequest *request,
     178             :       grpc::ServerWriter<iroha::protocol::ToriiResponse> *response_writer) {
     179           2 :     rxcpp::schedulers::run_loop rl;
     180             : 
     181             :     auto current_thread =
     182           2 :         rxcpp::observe_on_one_worker(rxcpp::schedulers::make_run_loop(rl));
     183             : 
     184           2 :     rxcpp::composite_subscription subscription;
     185             : 
     186           2 :     auto hash = shared_model::crypto::Hash(request->tx_hash());
     187             : 
     188           2 :     static auto client_id_format = boost::format("Peer: '%s', %s");
     189             :     std::string client_id =
     190           2 :         (client_id_format % context->peer() % hash.toString()).str();
     191             : 
     192           2 :     command_service_
     193           2 :         ->getStatusStream(hash)
     194             :         // convert to transport objects
     195             :         .map([&](auto response) {
     196           3 :           log_->debug("mapped {}, {}", response->toString(), client_id);
     197           3 :           return std::static_pointer_cast<
     198             :                      shared_model::proto::TransactionResponse>(response)
     199           3 :               ->getTransport();
     200           0 :         })
     201             :         // set a corresponding observable timeout based on status value
     202           2 :         .lift<iroha::protocol::ToriiResponse>(
     203           2 :             iroha::makeTimeout<iroha::protocol::ToriiResponse>(
     204             :                 [&](const auto &response) {
     205           3 :                   return response.tx_status()
     206           3 :                           == iroha::protocol::TxStatus::NOT_RECEIVED
     207           2 :                       ? initial_timeout_
     208           1 :                       : nonfinal_timeout_;
     209             :                 },
     210             :                 current_thread))
     211             :         // complete the observable if client is disconnected
     212             :         .take_while([=](const auto &) {
     213           3 :           auto is_cancelled = context->IsCancelled();
     214           3 :           if (is_cancelled) {
     215           2 :             log_->debug("client unsubscribed, {}", client_id);
     216           0 :           }
     217           3 :           return not is_cancelled;
     218             :         })
     219           2 :         .subscribe(subscription,
     220             :                    [&](iroha::protocol::ToriiResponse response) {
     221           3 :                      if (response_writer->Write(response)) {
     222           3 :                        log_->debug("status written, {}", client_id);
     223           3 :                      }
     224           3 :                    },
     225             :                    [&](std::exception_ptr ep) {
     226           1 :                      log_->debug("processing timeout, {}", client_id);
     227           1 :                    },
     228             :                    [&] { log_->debug("stream done, {}", client_id); });
     229             : 
     230             :     // run loop while subscription is active or there are pending events in
     231             :     // the queue
     232           2 :     handleEvents(subscription, rl);
     233             : 
     234           2 :     log_->debug("status stream done, {}", client_id);
     235           2 :     return grpc::Status::OK;
     236           2 :   }
     237             : }  // namespace torii

Generated by: LCOV version 1.13