Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "network/impl/block_loader_impl.hpp"
7 :
8 : #include <grpc++/create_channel.h>
9 :
10 : #include "backend/protobuf/block.hpp"
11 : #include "builders/protobuf/transport_builder.hpp"
12 : #include "common/bind.hpp"
13 : #include "interfaces/common_objects/peer.hpp"
14 : #include "network/impl/grpc_channel_builder.hpp"
15 :
16 : using namespace iroha::ametsuchi;
17 : using namespace iroha::network;
18 : using namespace shared_model::crypto;
19 : using namespace shared_model::interface;
20 :
21 : namespace {
22 : const char *kPeerNotFound = "Cannot find peer";
23 : const char *kPeerRetrieveFail = "Failed to retrieve peers";
24 : const char *kPeerFindFail = "Failed to find requested peer";
25 : } // namespace
26 :
27 : BlockLoaderImpl::BlockLoaderImpl(
28 : std::shared_ptr<PeerQueryFactory> peer_query_factory,
29 : shared_model::proto::ProtoBlockFactory factory)
30 0 : : peer_query_factory_(std::move(peer_query_factory)),
31 0 : block_factory_(std::move(factory)),
32 0 : log_(logger::log("BlockLoaderImpl")) {}
33 :
34 : rxcpp::observable<std::shared_ptr<Block>> BlockLoaderImpl::retrieveBlocks(
35 : const shared_model::interface::types::HeightType height,
36 : const PublicKey &peer_pubkey) {
37 0 : return rxcpp::observable<>::create<std::shared_ptr<Block>>(
38 : [this, height, &peer_pubkey](auto subscriber) {
39 0 : auto peer = this->findPeer(peer_pubkey);
40 0 : if (not peer) {
41 0 : log_->error(kPeerNotFound);
42 0 : subscriber.on_completed();
43 0 : return;
44 : }
45 :
46 0 : proto::BlocksRequest request;
47 0 : grpc::ClientContext context;
48 0 : protocol::Block block;
49 :
50 : // request next block to our top
51 0 : request.set_height(height + 1);
52 :
53 : auto reader =
54 0 : this->getPeerStub(**peer).retrieveBlocks(&context, request);
55 0 : while (reader->Read(&block)) {
56 0 : auto proto_block = block_factory_.createBlock(std::move(block));
57 0 : proto_block.match(
58 : [&subscriber](
59 : iroha::expected::Value<std::unique_ptr<Block>> &result) {
60 0 : subscriber.on_next(std::move(result.value));
61 0 : },
62 : [this,
63 : &context](const iroha::expected::Error<std::string> &error) {
64 0 : log_->error(error.error);
65 0 : context.TryCancel();
66 0 : });
67 0 : }
68 0 : reader->Finish();
69 0 : subscriber.on_completed();
70 0 : });
71 0 : }
72 :
73 : boost::optional<std::shared_ptr<Block>> BlockLoaderImpl::retrieveBlock(
74 : const PublicKey &peer_pubkey, const types::HashType &block_hash) {
75 0 : auto peer = findPeer(peer_pubkey);
76 0 : if (not peer) {
77 0 : log_->error(kPeerNotFound);
78 0 : return boost::none;
79 : }
80 :
81 0 : proto::BlockRequest request;
82 : grpc::ClientContext context;
83 0 : protocol::Block block;
84 :
85 : // request block with specified hash
86 0 : request.set_hash(toBinaryString(block_hash));
87 :
88 0 : auto status = getPeerStub(**peer).retrieveBlock(&context, request, &block);
89 0 : if (not status.ok()) {
90 0 : log_->warn(status.error_message());
91 0 : return boost::none;
92 : }
93 :
94 0 : auto result = block_factory_.createBlock(std::move(block));
95 :
96 0 : return result.match(
97 : [](iroha::expected::Value<std::unique_ptr<Block>> &v) {
98 0 : return boost::make_optional(std::shared_ptr<Block>(std::move(v.value)));
99 0 : },
100 : [this](const iroha::expected::Error<std::string> &e)
101 : -> boost::optional<std::shared_ptr<Block>> {
102 : log_->error(e.error);
103 0 : return boost::none;
104 : });
105 0 : }
106 :
107 : boost::optional<std::shared_ptr<shared_model::interface::Peer>>
108 : BlockLoaderImpl::findPeer(const shared_model::crypto::PublicKey &pubkey) {
109 0 : auto peers = peer_query_factory_->createPeerQuery() |
110 : [](const auto &query) { return query->getLedgerPeers(); };
111 0 : if (not peers) {
112 : log_->error(kPeerRetrieveFail);
113 0 : return boost::none;
114 : }
115 :
116 0 : auto &blob = pubkey.blob();
117 0 : auto it = std::find_if(
118 : peers.value().begin(), peers.value().end(), [&blob](const auto &peer) {
119 0 : return peer->pubkey().blob() == blob;
120 : });
121 0 : if (it == peers.value().end()) {
122 0 : log_->error(kPeerFindFail);
123 0 : return boost::none;
124 : }
125 0 : return *it;
126 0 : }
127 :
128 : proto::Loader::Stub &BlockLoaderImpl::getPeerStub(
129 : const shared_model::interface::Peer &peer) {
130 0 : auto it = peer_connections_.find(peer.address());
131 0 : if (it == peer_connections_.end()) {
132 0 : it = peer_connections_
133 0 : .insert(std::make_pair(
134 0 : peer.address(),
135 0 : network::createClient<proto::Loader>(peer.address())))
136 0 : .first;
137 0 : }
138 0 : return *it->second;
139 0 : }
|