Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "common/default_constructible_unary_fn.hpp" // non-copyable value workaround
7 :
8 : #include "multi_sig_transactions/transport/mst_transport_grpc.hpp"
9 :
10 : #include <boost/range/adaptor/filtered.hpp>
11 : #include <boost/range/adaptor/transformed.hpp>
12 : #include "ametsuchi/tx_presence_cache.hpp"
13 : #include "backend/protobuf/transaction.hpp"
14 : #include "interfaces/iroha_internal/transaction_batch.hpp"
15 : #include "interfaces/transaction.hpp"
16 : #include "validators/field_validator.hpp"
17 :
18 : using namespace iroha::network;
19 :
20 : using iroha::ConstRefState;
21 :
22 : void sendStateAsyncImpl(
23 : const shared_model::interface::Peer &to,
24 : ConstRefState state,
25 : const std::string &sender_key,
26 : AsyncGrpcClient<google::protobuf::Empty> &async_call);
27 :
28 : MstTransportGrpc::MstTransportGrpc(
29 : std::shared_ptr<AsyncGrpcClient<google::protobuf::Empty>> async_call,
30 : std::shared_ptr<TransportFactoryType> transaction_factory,
31 : std::shared_ptr<shared_model::interface::TransactionBatchParser>
32 : batch_parser,
33 : std::shared_ptr<shared_model::interface::TransactionBatchFactory>
34 : transaction_batch_factory,
35 : std::shared_ptr<iroha::ametsuchi::TxPresenceCache> tx_presence_cache,
36 : shared_model::crypto::PublicKey my_key)
37 9 : : async_call_(std::move(async_call)),
38 9 : transaction_factory_(std::move(transaction_factory)),
39 9 : batch_parser_(std::move(batch_parser)),
40 9 : batch_factory_(std::move(transaction_batch_factory)),
41 9 : tx_presence_cache_(std::move(tx_presence_cache)),
42 9 : my_key_(shared_model::crypto::toBinaryString(my_key)) {}
43 :
44 : shared_model::interface::types::SharedTxsCollectionType
45 : MstTransportGrpc::deserializeTransactions(const transport::MstState *request) {
46 6 : return boost::copy_range<
47 : shared_model::interface::types::SharedTxsCollectionType>(
48 6 : request->transactions()
49 6 : | boost::adaptors::transformed(
50 : [&](const auto &tx) { return transaction_factory_->build(tx); })
51 : | boost::adaptors::filtered([&](const auto &result) {
52 10 : return result.match(
53 : [](const iroha::expected::Value<
54 : std::unique_ptr<shared_model::interface::Transaction>> &) {
55 9 : return true;
56 : },
57 : [&](const iroha::expected::Error<TransportFactoryType::Error>
58 : &error) {
59 1 : async_call_->log_->info(
60 : "Transaction deserialization failed: hash {}, {}",
61 1 : error.error.hash.toString(),
62 1 : error.error.error);
63 1 : return false;
64 0 : });
65 : })
66 : | boost::adaptors::transformed([&](auto result) {
67 9 : return std::move(
68 9 : boost::get<iroha::expected::ValueOf<decltype(result)>>(
69 9 : result))
70 9 : .value;
71 : }));
72 0 : }
73 :
74 : grpc::Status MstTransportGrpc::SendState(
75 : ::grpc::ServerContext *context,
76 : const ::iroha::network::transport::MstState *request,
77 : ::google::protobuf::Empty *response) {
78 5 : async_call_->log_->info("MstState Received");
79 :
80 5 : auto transactions = deserializeTransactions(request);
81 :
82 : auto batches = batch_parser_->parseBatches(transactions);
83 :
84 6 : MstState new_state = MstState::empty();
85 :
86 13 : for (auto &batch : batches) {
87 7 : batch_factory_->createTransactionBatch(batch).match(
88 : [&](iroha::expected::Value<std::unique_ptr<
89 : shared_model::interface::TransactionBatch>> &value) {
90 7 : auto cache_presence = tx_presence_cache_->check(*value.value);
91 7 : if (not cache_presence) {
92 : // TODO andrei 30.11.18 IR-51 Handle database error
93 1 : async_call_->log_->warn(
94 : "Check tx presence database error. Batch: {}",
95 1 : value.value->toString());
96 1 : return;
97 : }
98 6 : auto is_replay = std::any_of(
99 6 : cache_presence->begin(),
100 6 : cache_presence->end(),
101 : [](const auto &tx_status) {
102 : return iroha::visit_in_place(
103 7 : tx_status,
104 : [](const iroha::ametsuchi::tx_cache_status_responses::
105 6 : Missing &) { return false; },
106 : [](const auto &) { return true; });
107 : });
108 :
109 6 : if (not is_replay) {
110 5 : new_state += std::move(value).value;
111 5 : }
112 : },
113 : [&](iroha::expected::Error<std::string> &error) {
114 0 : async_call_->log_->warn("Batch deserialization failed: {}",
115 0 : error.error);
116 0 : });
117 : }
118 :
119 6 : async_call_->log_->info("batches in MstState: {}",
120 6 : new_state.getBatches().size());
121 :
122 6 : shared_model::crypto::PublicKey source_key(request->source_peer_key());
123 : auto key_invalid_reason =
124 6 : shared_model::validation::validatePubkey(source_key);
125 6 : if (key_invalid_reason) {
126 1 : async_call_->log_->info(
127 : "Dropping received MST State due to invalid public key: {}",
128 1 : *key_invalid_reason);
129 1 : return grpc::Status::OK;
130 : }
131 :
132 5 : if (new_state.isEmpty()) {
133 2 : async_call_->log_->info(
134 : "All transactions from received MST state have been processed already, "
135 : "nothing to propagate to MST processor");
136 2 : return grpc::Status::OK;
137 : }
138 :
139 3 : if (auto subscriber = subscriber_.lock()) {
140 3 : subscriber->onNewState(source_key, std::move(new_state));
141 3 : } else {
142 0 : async_call_->log_->warn("No subscriber for MST SendState event is set");
143 : }
144 :
145 3 : return grpc::Status::OK;
146 6 : }
147 :
148 : void MstTransportGrpc::subscribe(
149 : std::shared_ptr<MstTransportNotification> notification) {
150 9 : subscriber_ = notification;
151 9 : }
152 :
153 : void MstTransportGrpc::sendState(const shared_model::interface::Peer &to,
154 : ConstRefState providing_state) {
155 3 : async_call_->log_->info("Propagate MstState to peer {}", to.address());
156 3 : sendStateAsyncImpl(to, providing_state, my_key_, *async_call_);
157 3 : }
158 :
159 : void iroha::network::sendStateAsync(
160 : const shared_model::interface::Peer &to,
161 : ConstRefState state,
162 : const shared_model::crypto::PublicKey &sender_key,
163 : AsyncGrpcClient<google::protobuf::Empty> &async_call) {
164 0 : sendStateAsyncImpl(
165 0 : to, state, shared_model::crypto::toBinaryString(sender_key), async_call);
166 0 : }
167 :
168 : void sendStateAsyncImpl(const shared_model::interface::Peer &to,
169 : ConstRefState state,
170 : const std::string &sender_key,
171 : AsyncGrpcClient<google::protobuf::Empty> &async_call) {
172 3 : std::unique_ptr<transport::MstTransportGrpc::StubInterface> client =
173 3 : transport::MstTransportGrpc::NewStub(grpc::CreateChannel(
174 3 : to.address(), grpc::InsecureChannelCredentials()));
175 :
176 3 : transport::MstState protoState;
177 3 : protoState.set_source_peer_key(sender_key);
178 8 : for (auto &batch : state.getBatches()) {
179 10 : for (auto &tx : batch->transactions()) {
180 : // TODO (@l4l) 04/03/18 simplify with IR-1040
181 5 : *protoState.add_transactions() =
182 5 : std::static_pointer_cast<shared_model::proto::Transaction>(tx)
183 5 : ->getTransport();
184 : }
185 : }
186 :
187 : async_call.Call([&](auto context, auto cq) {
188 3 : return client->AsyncSendState(context, protoState, cq);
189 : });
190 3 : }
|