LCOV - code coverage report
Current view: top level - irohad/network/impl - block_loader_impl.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 0 65 0.0 %
Date: 2018-12-05 17:11:35 Functions: 0 17 0.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "network/impl/block_loader_impl.hpp"
       7             : 
       8             : #include <grpc++/create_channel.h>
       9             : 
      10             : #include "backend/protobuf/block.hpp"
      11             : #include "builders/protobuf/transport_builder.hpp"
      12             : #include "common/bind.hpp"
      13             : #include "interfaces/common_objects/peer.hpp"
      14             : #include "network/impl/grpc_channel_builder.hpp"
      15             : 
      16             : using namespace iroha::ametsuchi;
      17             : using namespace iroha::network;
      18             : using namespace shared_model::crypto;
      19             : using namespace shared_model::interface;
      20             : 
      21             : namespace {
      22             :   const char *kPeerNotFound = "Cannot find peer";
      23             :   const char *kPeerRetrieveFail = "Failed to retrieve peers";
      24             :   const char *kPeerFindFail = "Failed to find requested peer";
      25             : }  // namespace
      26             : 
      27             : BlockLoaderImpl::BlockLoaderImpl(
      28             :     std::shared_ptr<PeerQueryFactory> peer_query_factory,
      29             :     shared_model::proto::ProtoBlockFactory factory)
      30           0 :     : peer_query_factory_(std::move(peer_query_factory)),
      31           0 :       block_factory_(std::move(factory)),
      32           0 :       log_(logger::log("BlockLoaderImpl")) {}
      33             : 
      34             : rxcpp::observable<std::shared_ptr<Block>> BlockLoaderImpl::retrieveBlocks(
      35             :     const shared_model::interface::types::HeightType height,
      36             :     const PublicKey &peer_pubkey) {
      37           0 :   return rxcpp::observable<>::create<std::shared_ptr<Block>>(
      38             :       [this, height, &peer_pubkey](auto subscriber) {
      39           0 :         auto peer = this->findPeer(peer_pubkey);
      40           0 :         if (not peer) {
      41           0 :           log_->error(kPeerNotFound);
      42           0 :           subscriber.on_completed();
      43           0 :           return;
      44             :         }
      45             : 
      46           0 :         proto::BlocksRequest request;
      47           0 :         grpc::ClientContext context;
      48           0 :         protocol::Block block;
      49             : 
      50             :         // request next block to our top
      51           0 :         request.set_height(height + 1);
      52             : 
      53             :         auto reader =
      54           0 :             this->getPeerStub(**peer).retrieveBlocks(&context, request);
      55           0 :         while (reader->Read(&block)) {
      56           0 :           auto proto_block = block_factory_.createBlock(std::move(block));
      57           0 :           proto_block.match(
      58             :               [&subscriber](
      59             :                   iroha::expected::Value<std::unique_ptr<Block>> &result) {
      60           0 :                 subscriber.on_next(std::move(result.value));
      61           0 :               },
      62             :               [this,
      63             :                &context](const iroha::expected::Error<std::string> &error) {
      64           0 :                 log_->error(error.error);
      65           0 :                 context.TryCancel();
      66           0 :               });
      67           0 :         }
      68           0 :         reader->Finish();
      69           0 :         subscriber.on_completed();
      70           0 :       });
      71           0 : }
      72             : 
      73             : boost::optional<std::shared_ptr<Block>> BlockLoaderImpl::retrieveBlock(
      74             :     const PublicKey &peer_pubkey, const types::HashType &block_hash) {
      75           0 :   auto peer = findPeer(peer_pubkey);
      76           0 :   if (not peer) {
      77           0 :     log_->error(kPeerNotFound);
      78           0 :     return boost::none;
      79             :   }
      80             : 
      81           0 :   proto::BlockRequest request;
      82             :   grpc::ClientContext context;
      83           0 :   protocol::Block block;
      84             : 
      85             :   // request block with specified hash
      86           0 :   request.set_hash(toBinaryString(block_hash));
      87             : 
      88           0 :   auto status = getPeerStub(**peer).retrieveBlock(&context, request, &block);
      89           0 :   if (not status.ok()) {
      90           0 :     log_->warn(status.error_message());
      91           0 :     return boost::none;
      92             :   }
      93             : 
      94           0 :   auto result = block_factory_.createBlock(std::move(block));
      95             : 
      96           0 :   return result.match(
      97             :       [](iroha::expected::Value<std::unique_ptr<Block>> &v) {
      98           0 :         return boost::make_optional(std::shared_ptr<Block>(std::move(v.value)));
      99           0 :       },
     100             :       [this](const iroha::expected::Error<std::string> &e)
     101             :           -> boost::optional<std::shared_ptr<Block>> {
     102             :         log_->error(e.error);
     103           0 :         return boost::none;
     104             :       });
     105           0 : }
     106             : 
     107             : boost::optional<std::shared_ptr<shared_model::interface::Peer>>
     108             : BlockLoaderImpl::findPeer(const shared_model::crypto::PublicKey &pubkey) {
     109           0 :   auto peers = peer_query_factory_->createPeerQuery() |
     110             :       [](const auto &query) { return query->getLedgerPeers(); };
     111           0 :   if (not peers) {
     112             :     log_->error(kPeerRetrieveFail);
     113           0 :     return boost::none;
     114             :   }
     115             : 
     116           0 :   auto &blob = pubkey.blob();
     117           0 :   auto it = std::find_if(
     118             :       peers.value().begin(), peers.value().end(), [&blob](const auto &peer) {
     119           0 :         return peer->pubkey().blob() == blob;
     120             :       });
     121           0 :   if (it == peers.value().end()) {
     122           0 :     log_->error(kPeerFindFail);
     123           0 :     return boost::none;
     124             :   }
     125           0 :   return *it;
     126           0 : }
     127             : 
     128             : proto::Loader::Stub &BlockLoaderImpl::getPeerStub(
     129             :     const shared_model::interface::Peer &peer) {
     130           0 :   auto it = peer_connections_.find(peer.address());
     131           0 :   if (it == peer_connections_.end()) {
     132           0 :     it = peer_connections_
     133           0 :              .insert(std::make_pair(
     134           0 :                  peer.address(),
     135           0 :                  network::createClient<proto::Loader>(peer.address())))
     136           0 :              .first;
     137           0 :   }
     138           0 :   return *it->second;
     139           0 : }

Generated by: LCOV version 1.13