LCOV - code coverage report
Current view: top level - irohad/multi_sig_transactions/transport/impl - mst_transport_grpc.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 70 79 88.6 %
Date: 2018-12-05 17:11:35 Functions: 21 24 87.5 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "common/default_constructible_unary_fn.hpp"  // non-copyable value workaround
       7             : 
       8             : #include "multi_sig_transactions/transport/mst_transport_grpc.hpp"
       9             : 
      10             : #include <boost/range/adaptor/filtered.hpp>
      11             : #include <boost/range/adaptor/transformed.hpp>
      12             : #include "ametsuchi/tx_presence_cache.hpp"
      13             : #include "backend/protobuf/transaction.hpp"
      14             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      15             : #include "interfaces/transaction.hpp"
      16             : #include "validators/field_validator.hpp"
      17             : 
      18             : using namespace iroha::network;
      19             : 
      20             : using iroha::ConstRefState;
      21             : 
      22             : void sendStateAsyncImpl(
      23             :     const shared_model::interface::Peer &to,
      24             :     ConstRefState state,
      25             :     const std::string &sender_key,
      26             :     AsyncGrpcClient<google::protobuf::Empty> &async_call);
      27             : 
      28             : MstTransportGrpc::MstTransportGrpc(
      29             :     std::shared_ptr<AsyncGrpcClient<google::protobuf::Empty>> async_call,
      30             :     std::shared_ptr<TransportFactoryType> transaction_factory,
      31             :     std::shared_ptr<shared_model::interface::TransactionBatchParser>
      32             :         batch_parser,
      33             :     std::shared_ptr<shared_model::interface::TransactionBatchFactory>
      34             :         transaction_batch_factory,
      35             :     std::shared_ptr<iroha::ametsuchi::TxPresenceCache> tx_presence_cache,
      36             :     shared_model::crypto::PublicKey my_key)
      37           9 :     : async_call_(std::move(async_call)),
      38           9 :       transaction_factory_(std::move(transaction_factory)),
      39           9 :       batch_parser_(std::move(batch_parser)),
      40           9 :       batch_factory_(std::move(transaction_batch_factory)),
      41           9 :       tx_presence_cache_(std::move(tx_presence_cache)),
      42           9 :       my_key_(shared_model::crypto::toBinaryString(my_key)) {}
      43             : 
      44             : shared_model::interface::types::SharedTxsCollectionType
      45             : MstTransportGrpc::deserializeTransactions(const transport::MstState *request) {
      46           6 :   return boost::copy_range<
      47             :       shared_model::interface::types::SharedTxsCollectionType>(
      48           6 :       request->transactions()
      49           6 :       | boost::adaptors::transformed(
      50             :             [&](const auto &tx) { return transaction_factory_->build(tx); })
      51             :       | boost::adaptors::filtered([&](const auto &result) {
      52          10 :           return result.match(
      53             :               [](const iroha::expected::Value<
      54             :                   std::unique_ptr<shared_model::interface::Transaction>> &) {
      55           9 :                 return true;
      56             :               },
      57             :               [&](const iroha::expected::Error<TransportFactoryType::Error>
      58             :                       &error) {
      59           1 :                 async_call_->log_->info(
      60             :                     "Transaction deserialization failed: hash {}, {}",
      61           1 :                     error.error.hash.toString(),
      62           1 :                     error.error.error);
      63           1 :                 return false;
      64           0 :               });
      65             :         })
      66             :       | boost::adaptors::transformed([&](auto result) {
      67           9 :           return std::move(
      68           9 :                      boost::get<iroha::expected::ValueOf<decltype(result)>>(
      69           9 :                          result))
      70           9 :               .value;
      71             :         }));
      72           0 : }
      73             : 
      74             : grpc::Status MstTransportGrpc::SendState(
      75             :     ::grpc::ServerContext *context,
      76             :     const ::iroha::network::transport::MstState *request,
      77             :     ::google::protobuf::Empty *response) {
      78           5 :   async_call_->log_->info("MstState Received");
      79             : 
      80           5 :   auto transactions = deserializeTransactions(request);
      81             : 
      82             :   auto batches = batch_parser_->parseBatches(transactions);
      83             : 
      84           6 :   MstState new_state = MstState::empty();
      85             : 
      86          13 :   for (auto &batch : batches) {
      87           7 :     batch_factory_->createTransactionBatch(batch).match(
      88             :         [&](iroha::expected::Value<std::unique_ptr<
      89             :                 shared_model::interface::TransactionBatch>> &value) {
      90           7 :           auto cache_presence = tx_presence_cache_->check(*value.value);
      91           7 :           if (not cache_presence) {
      92             :             // TODO andrei 30.11.18 IR-51 Handle database error
      93           1 :             async_call_->log_->warn(
      94             :                 "Check tx presence database error. Batch: {}",
      95           1 :                 value.value->toString());
      96           1 :             return;
      97             :           }
      98           6 :           auto is_replay = std::any_of(
      99           6 :               cache_presence->begin(),
     100           6 :               cache_presence->end(),
     101             :               [](const auto &tx_status) {
     102             :                 return iroha::visit_in_place(
     103           7 :                     tx_status,
     104             :                     [](const iroha::ametsuchi::tx_cache_status_responses::
     105           6 :                            Missing &) { return false; },
     106             :                     [](const auto &) { return true; });
     107             :               });
     108             : 
     109           6 :           if (not is_replay) {
     110           5 :             new_state += std::move(value).value;
     111           5 :           }
     112             :         },
     113             :         [&](iroha::expected::Error<std::string> &error) {
     114           0 :           async_call_->log_->warn("Batch deserialization failed: {}",
     115           0 :                                   error.error);
     116           0 :         });
     117             :   }
     118             : 
     119           6 :   async_call_->log_->info("batches in MstState: {}",
     120           6 :                           new_state.getBatches().size());
     121             : 
     122           6 :   shared_model::crypto::PublicKey source_key(request->source_peer_key());
     123             :   auto key_invalid_reason =
     124           6 :       shared_model::validation::validatePubkey(source_key);
     125           6 :   if (key_invalid_reason) {
     126           1 :     async_call_->log_->info(
     127             :         "Dropping received MST State due to invalid public key: {}",
     128           1 :         *key_invalid_reason);
     129           1 :     return grpc::Status::OK;
     130             :   }
     131             : 
     132           5 :   if (new_state.isEmpty()) {
     133           2 :     async_call_->log_->info(
     134             :         "All transactions from received MST state have been processed already, "
     135             :         "nothing to propagate to MST processor");
     136           2 :     return grpc::Status::OK;
     137             :   }
     138             : 
     139           3 :   if (auto subscriber = subscriber_.lock()) {
     140           3 :     subscriber->onNewState(source_key, std::move(new_state));
     141           3 :   } else {
     142           0 :     async_call_->log_->warn("No subscriber for MST SendState event is set");
     143             :   }
     144             : 
     145           3 :   return grpc::Status::OK;
     146           6 : }
     147             : 
     148             : void MstTransportGrpc::subscribe(
     149             :     std::shared_ptr<MstTransportNotification> notification) {
     150           9 :   subscriber_ = notification;
     151           9 : }
     152             : 
     153             : void MstTransportGrpc::sendState(const shared_model::interface::Peer &to,
     154             :                                  ConstRefState providing_state) {
     155           3 :   async_call_->log_->info("Propagate MstState to peer {}", to.address());
     156           3 :   sendStateAsyncImpl(to, providing_state, my_key_, *async_call_);
     157           3 : }
     158             : 
     159             : void iroha::network::sendStateAsync(
     160             :     const shared_model::interface::Peer &to,
     161             :     ConstRefState state,
     162             :     const shared_model::crypto::PublicKey &sender_key,
     163             :     AsyncGrpcClient<google::protobuf::Empty> &async_call) {
     164           0 :   sendStateAsyncImpl(
     165           0 :       to, state, shared_model::crypto::toBinaryString(sender_key), async_call);
     166           0 : }
     167             : 
     168             : void sendStateAsyncImpl(const shared_model::interface::Peer &to,
     169             :                         ConstRefState state,
     170             :                         const std::string &sender_key,
     171             :                         AsyncGrpcClient<google::protobuf::Empty> &async_call) {
     172           3 :   std::unique_ptr<transport::MstTransportGrpc::StubInterface> client =
     173           3 :       transport::MstTransportGrpc::NewStub(grpc::CreateChannel(
     174           3 :           to.address(), grpc::InsecureChannelCredentials()));
     175             : 
     176           3 :   transport::MstState protoState;
     177           3 :   protoState.set_source_peer_key(sender_key);
     178           8 :   for (auto &batch : state.getBatches()) {
     179          10 :     for (auto &tx : batch->transactions()) {
     180             :       // TODO (@l4l) 04/03/18 simplify with IR-1040
     181           5 :       *protoState.add_transactions() =
     182           5 :           std::static_pointer_cast<shared_model::proto::Transaction>(tx)
     183           5 :               ->getTransport();
     184             :     }
     185             :   }
     186             : 
     187             :   async_call.Call([&](auto context, auto cq) {
     188           3 :     return client->AsyncSendState(context, protoState, cq);
     189             :   });
     190           3 : }

Generated by: LCOV version 1.13