Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "common/default_constructible_unary_fn.hpp" // non-copyable value workaround
7 :
8 : #include "torii/impl/command_service_transport_grpc.hpp"
9 :
10 : #include <iterator>
11 :
12 : #include <boost/format.hpp>
13 : #include <boost/range/adaptor/filtered.hpp>
14 : #include <boost/range/adaptor/transformed.hpp>
15 : #include "backend/protobuf/transaction_responses/proto_tx_response.hpp"
16 : #include "common/timeout.hpp"
17 : #include "interfaces/iroha_internal/transaction_batch.hpp"
18 : #include "interfaces/transaction.hpp"
19 :
20 : namespace torii {
21 :
22 : CommandServiceTransportGrpc::CommandServiceTransportGrpc(
23 : std::shared_ptr<CommandService> command_service,
24 : std::shared_ptr<iroha::torii::StatusBus> status_bus,
25 : std::chrono::milliseconds initial_timeout,
26 : std::chrono::milliseconds nonfinal_timeout,
27 : std::shared_ptr<shared_model::interface::TxStatusFactory> status_factory,
28 : std::shared_ptr<TransportFactoryType> transaction_factory,
29 : std::shared_ptr<shared_model::interface::TransactionBatchParser>
30 : batch_parser,
31 : std::shared_ptr<shared_model::interface::TransactionBatchFactory>
32 : transaction_batch_factory)
33 261 : : command_service_(std::move(command_service)),
34 261 : status_bus_(std::move(status_bus)),
35 261 : initial_timeout_(initial_timeout),
36 261 : nonfinal_timeout_(nonfinal_timeout),
37 261 : status_factory_(std::move(status_factory)),
38 261 : transaction_factory_(std::move(transaction_factory)),
39 261 : batch_parser_(std::move(batch_parser)),
40 261 : batch_factory_(std::move(transaction_batch_factory)),
41 261 : log_(logger::log("CommandServiceTransportGrpc")) {}
42 :
43 : grpc::Status CommandServiceTransportGrpc::Torii(
44 : grpc::ServerContext *context,
45 : const iroha::protocol::Transaction *request,
46 : google::protobuf::Empty *response) {
47 775 : iroha::protocol::TxList single_tx_list;
48 775 : *single_tx_list.add_transactions() = *request;
49 775 : return ListTorii(context, &single_tx_list, response);
50 775 : }
51 :
52 : namespace {
53 : /**
54 : * Form an error message, which is to be shared between all transactions, if
55 : * there are several of them, or individual message, if there's only one
56 : * @param tx_hashes is non empty hash list to form error message from
57 : * @param error of those tx(s)
58 : * @return message
59 : */
60 : std::string formErrorMessage(
61 : const std::vector<shared_model::crypto::Hash> &tx_hashes,
62 : const std::string &error) {
63 1 : if (tx_hashes.size() == 1) {
64 1 : return (boost::format("Stateless invalid tx, error: %s, hash: %s")
65 1 : % error % tx_hashes[0].hex())
66 1 : .str();
67 : }
68 :
69 : std::string folded_hashes =
70 0 : std::accumulate(std::next(tx_hashes.begin()),
71 0 : tx_hashes.end(),
72 0 : tx_hashes[0].hex(),
73 : [](auto &&acc, const auto &h) -> std::string {
74 0 : return acc + ", " + h.hex();
75 0 : });
76 :
77 0 : return (boost::format(
78 : "Stateless invalid tx in transaction sequence, error: %s\n"
79 : "Hash list: [%s]")
80 0 : % error % folded_hashes)
81 0 : .str();
82 : }
83 : } // namespace
84 :
85 : shared_model::interface::types::SharedTxsCollectionType
86 : CommandServiceTransportGrpc::deserializeTransactions(
87 : const iroha::protocol::TxList *request) {
88 783 : return boost::copy_range<
89 : shared_model::interface::types::SharedTxsCollectionType>(
90 783 : request->transactions()
91 783 : | boost::adaptors::transformed(
92 : [&](const auto &tx) { return transaction_factory_->build(tx); })
93 : | boost::adaptors::filtered([&](const auto &result) {
94 804 : return result.match(
95 : [](const iroha::expected::Value<
96 : std::unique_ptr<shared_model::interface::Transaction>> &) {
97 745 : return true;
98 : },
99 : [&](const iroha::expected::Error<TransportFactoryType::Error>
100 : &error) {
101 59 : status_bus_->publish(status_factory_->makeStatelessFail(
102 : error.error.hash,
103 59 : shared_model::interface::TxStatusFactory::
104 59 : TransactionError{error.error.error, 0, 0}));
105 59 : return false;
106 0 : });
107 : })
108 : | boost::adaptors::transformed([&](auto result) {
109 745 : return std::move(
110 745 : boost::get<iroha::expected::ValueOf<decltype(result)>>(
111 745 : result))
112 : .value;
113 : }));
114 0 : }
115 :
116 : grpc::Status CommandServiceTransportGrpc::ListTorii(
117 : grpc::ServerContext *context,
118 : const iroha::protocol::TxList *request,
119 : google::protobuf::Empty *response) {
120 783 : auto transactions = deserializeTransactions(request);
121 :
122 783 : auto batches = batch_parser_->parseBatches(transactions);
123 :
124 1523 : for (auto &batch : batches) {
125 740 : batch_factory_->createTransactionBatch(batch).match(
126 : [&](iroha::expected::Value<std::unique_ptr<
127 : shared_model::interface::TransactionBatch>> &value) {
128 739 : this->command_service_->handleTransactionBatch(
129 739 : std::move(value).value);
130 739 : },
131 : [&](iroha::expected::Error<std::string> &error) {
132 1 : std::vector<shared_model::crypto::Hash> hashes;
133 :
134 740 : std::transform(batch.begin(),
135 1 : batch.end(),
136 1 : std::back_inserter(hashes),
137 : [](const auto &tx) { return tx->hash(); });
138 :
139 1 : auto error_msg = formErrorMessage(hashes, error.error);
140 : // set error response for each transaction in a batch candidate
141 1 : std::for_each(
142 : hashes.begin(), hashes.end(), [this, &error_msg](auto &hash) {
143 1 : status_bus_->publish(status_factory_->makeStatelessFail(
144 1 : hash,
145 1 : shared_model::interface::TxStatusFactory::
146 1 : TransactionError{error_msg, 0, 0}));
147 1 : });
148 1 : });
149 : }
150 :
151 783 : return grpc::Status::OK;
152 783 : }
153 :
154 : grpc::Status CommandServiceTransportGrpc::Status(
155 : grpc::ServerContext *context,
156 : const iroha::protocol::TxStatusRequest *request,
157 : iroha::protocol::ToriiResponse *response) {
158 813 : *response =
159 813 : std::static_pointer_cast<shared_model::proto::TransactionResponse>(
160 813 : command_service_->getStatus(
161 813 : shared_model::crypto::Hash(request->tx_hash())))
162 813 : ->getTransport();
163 813 : return grpc::Status::OK;
164 0 : }
165 :
166 : namespace {
167 : void handleEvents(rxcpp::composite_subscription &subscription,
168 : rxcpp::schedulers::run_loop &run_loop) {
169 5660056 : while (subscription.is_subscribed() or not run_loop.empty()) {
170 5660054 : run_loop.dispatch();
171 : }
172 2 : }
173 : } // namespace
174 :
175 : grpc::Status CommandServiceTransportGrpc::StatusStream(
176 : grpc::ServerContext *context,
177 : const iroha::protocol::TxStatusRequest *request,
178 : grpc::ServerWriter<iroha::protocol::ToriiResponse> *response_writer) {
179 2 : rxcpp::schedulers::run_loop rl;
180 :
181 : auto current_thread =
182 2 : rxcpp::observe_on_one_worker(rxcpp::schedulers::make_run_loop(rl));
183 :
184 2 : rxcpp::composite_subscription subscription;
185 :
186 2 : auto hash = shared_model::crypto::Hash(request->tx_hash());
187 :
188 2 : static auto client_id_format = boost::format("Peer: '%s', %s");
189 : std::string client_id =
190 2 : (client_id_format % context->peer() % hash.toString()).str();
191 :
192 2 : command_service_
193 2 : ->getStatusStream(hash)
194 : // convert to transport objects
195 : .map([&](auto response) {
196 3 : log_->debug("mapped {}, {}", response->toString(), client_id);
197 3 : return std::static_pointer_cast<
198 : shared_model::proto::TransactionResponse>(response)
199 3 : ->getTransport();
200 0 : })
201 : // set a corresponding observable timeout based on status value
202 2 : .lift<iroha::protocol::ToriiResponse>(
203 2 : iroha::makeTimeout<iroha::protocol::ToriiResponse>(
204 : [&](const auto &response) {
205 3 : return response.tx_status()
206 3 : == iroha::protocol::TxStatus::NOT_RECEIVED
207 2 : ? initial_timeout_
208 1 : : nonfinal_timeout_;
209 : },
210 : current_thread))
211 : // complete the observable if client is disconnected
212 : .take_while([=](const auto &) {
213 3 : auto is_cancelled = context->IsCancelled();
214 3 : if (is_cancelled) {
215 2 : log_->debug("client unsubscribed, {}", client_id);
216 0 : }
217 3 : return not is_cancelled;
218 : })
219 2 : .subscribe(subscription,
220 : [&](iroha::protocol::ToriiResponse response) {
221 3 : if (response_writer->Write(response)) {
222 3 : log_->debug("status written, {}", client_id);
223 3 : }
224 3 : },
225 : [&](std::exception_ptr ep) {
226 1 : log_->debug("processing timeout, {}", client_id);
227 1 : },
228 : [&] { log_->debug("stream done, {}", client_id); });
229 :
230 : // run loop while subscription is active or there are pending events in
231 : // the queue
232 2 : handleEvents(subscription, rl);
233 :
234 2 : log_->debug("status stream done, {}", client_id);
235 2 : return grpc::Status::OK;
236 2 : }
237 : } // namespace torii
|