Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "common/default_constructible_unary_fn.hpp" // non-copyable value workaround
7 :
8 : #include "ordering/impl/on_demand_os_server_grpc.hpp"
9 :
10 : #include <boost/range/adaptor/filtered.hpp>
11 : #include <boost/range/adaptor/transformed.hpp>
12 :
13 : #include "backend/protobuf/proposal.hpp"
14 : #include "common/bind.hpp"
15 : #include "interfaces/iroha_internal/transaction_batch.hpp"
16 :
17 : using namespace iroha::ordering::transport;
18 :
19 : OnDemandOsServerGrpc::OnDemandOsServerGrpc(
20 : std::shared_ptr<OdOsNotification> ordering_service,
21 : std::shared_ptr<TransportFactoryType> transaction_factory,
22 : std::shared_ptr<shared_model::interface::TransactionBatchParser>
23 : batch_parser,
24 : std::shared_ptr<shared_model::interface::TransactionBatchFactory>
25 : transaction_batch_factory)
26 3 : : ordering_service_(ordering_service),
27 3 : transaction_factory_(std::move(transaction_factory)),
28 3 : batch_parser_(std::move(batch_parser)),
29 3 : batch_factory_(std::move(transaction_batch_factory)),
30 3 : log_(logger::log("OnDemandOsServerGrpc")) {}
31 :
32 : shared_model::interface::types::SharedTxsCollectionType
33 : OnDemandOsServerGrpc::deserializeTransactions(
34 : const proto::BatchesRequest *request) {
35 1 : return boost::copy_range<
36 : shared_model::interface::types::SharedTxsCollectionType>(
37 1 : request->transactions()
38 1 : | boost::adaptors::transformed(
39 : [&](const auto &tx) { return transaction_factory_->build(tx); })
40 : | boost::adaptors::filtered([&](const auto &result) {
41 1 : return result.match(
42 : [](const iroha::expected::Value<
43 : std::unique_ptr<shared_model::interface::Transaction>> &) {
44 1 : return true;
45 : },
46 : [&](const iroha::expected::Error<TransportFactoryType::Error>
47 : &error) {
48 0 : log_->info("Transaction deserialization failed: hash {}, {}",
49 0 : error.error.hash.toString(),
50 0 : error.error.error);
51 0 : return false;
52 0 : });
53 : })
54 : | boost::adaptors::transformed([](auto result) {
55 1 : return std::move(
56 1 : boost::get<iroha::expected::ValueOf<decltype(result)>>(
57 1 : result))
58 1 : .value;
59 : }));
60 0 : }
61 :
62 : grpc::Status OnDemandOsServerGrpc::SendBatches(
63 : ::grpc::ServerContext *context,
64 : const proto::BatchesRequest *request,
65 : ::google::protobuf::Empty *response) {
66 1 : consensus::Round round{request->round().block_round(),
67 1 : request->round().reject_round()};
68 1 : auto transactions = deserializeTransactions(request);
69 :
70 1 : auto batch_candidates = batch_parser_->parseBatches(transactions);
71 :
72 1 : auto batches = std::accumulate(
73 1 : std::begin(batch_candidates),
74 1 : std::end(batch_candidates),
75 1 : OdOsNotification::CollectionType{},
76 : [this](auto &acc, const auto &cand) {
77 1 : batch_factory_->createTransactionBatch(cand).match(
78 : [&](iroha::expected::Value<
79 : std::unique_ptr<shared_model::interface::TransactionBatch>>
80 1 : &value) { acc.push_back(std::move(value).value); },
81 : [&](iroha::expected::Error<std::string> &error) {
82 : log_->warn("Batch deserialization failed: {}", error.error);
83 0 : });
84 1 : return acc;
85 0 : });
86 :
87 1 : ordering_service_->onBatches(round, std::move(batches));
88 :
89 1 : return ::grpc::Status::OK;
90 1 : }
91 :
92 : grpc::Status OnDemandOsServerGrpc::RequestProposal(
93 : ::grpc::ServerContext *context,
94 : const proto::ProposalRequest *request,
95 : proto::ProposalResponse *response) {
96 2 : ordering_service_->onRequestProposal(
97 2 : {request->round().block_round(), request->round().reject_round()})
98 : | [&](auto &&proposal) {
99 1 : *response->mutable_proposal() = std::move(
100 1 : static_cast<shared_model::proto::Proposal *>(proposal.get())
101 1 : ->getTransport());
102 : };
103 2 : return ::grpc::Status::OK;
104 0 : }
|