Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering/impl/on_demand_os_client_grpc.hpp"
7 :
8 : #include "backend/protobuf/proposal.hpp"
9 : #include "backend/protobuf/transaction.hpp"
10 : #include "interfaces/common_objects/peer.hpp"
11 : #include "interfaces/iroha_internal/transaction_batch.hpp"
12 : #include "network/impl/grpc_channel_builder.hpp"
13 :
14 : using namespace iroha::ordering::transport;
15 :
16 : OnDemandOsClientGrpc::OnDemandOsClientGrpc(
17 : std::unique_ptr<proto::OnDemandOrdering::StubInterface> stub,
18 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
19 : async_call,
20 : std::function<TimepointType()> time_provider,
21 : std::chrono::milliseconds proposal_request_timeout)
22 3 : : log_(logger::log("OnDemandOsClientGrpc")),
23 3 : stub_(std::move(stub)),
24 3 : async_call_(std::move(async_call)),
25 3 : time_provider_(std::move(time_provider)),
26 3 : proposal_request_timeout_(proposal_request_timeout) {}
27 :
28 : void OnDemandOsClientGrpc::onBatches(consensus::Round round,
29 : CollectionType batches) {
30 1 : proto::BatchesRequest request;
31 1 : request.mutable_round()->set_block_round(round.block_round);
32 1 : request.mutable_round()->set_reject_round(round.reject_round);
33 2 : for (auto &batch : batches) {
34 2 : for (auto &transaction : batch->transactions()) {
35 1 : *request.add_transactions() = std::move(
36 1 : static_cast<shared_model::proto::Transaction *>(transaction.get())
37 1 : ->getTransport());
38 : }
39 : }
40 :
41 1 : log_->debug("Propagating: '{}'", request.DebugString());
42 :
43 : async_call_->Call([&](auto context, auto cq) {
44 1 : return stub_->AsyncSendBatches(context, request, cq);
45 : });
46 1 : }
47 :
48 : boost::optional<OdOsNotification::ProposalType>
49 : OnDemandOsClientGrpc::onRequestProposal(consensus::Round round) {
50 2 : grpc::ClientContext context;
51 2 : context.set_deadline(time_provider_() + proposal_request_timeout_);
52 2 : proto::ProposalRequest request;
53 2 : request.mutable_round()->set_block_round(round.block_round);
54 2 : request.mutable_round()->set_reject_round(round.reject_round);
55 2 : proto::ProposalResponse response;
56 2 : auto status = stub_->RequestProposal(&context, request, &response);
57 2 : if (not status.ok()) {
58 0 : log_->warn("RPC failed: {}", status.error_message());
59 0 : return boost::none;
60 : }
61 2 : if (not response.has_proposal()) {
62 1 : return boost::none;
63 : }
64 1 : return ProposalType{std::make_unique<shared_model::proto::Proposal>(
65 1 : std::move(response.proposal()))};
66 2 : }
67 :
68 : OnDemandOsClientGrpcFactory::OnDemandOsClientGrpcFactory(
69 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
70 : async_call,
71 : std::function<OnDemandOsClientGrpc::TimepointType()> time_provider,
72 : OnDemandOsClientGrpc::TimeoutType proposal_request_timeout)
73 0 : : async_call_(std::move(async_call)),
74 0 : time_provider_(time_provider),
75 0 : proposal_request_timeout_(proposal_request_timeout) {}
76 :
77 : std::unique_ptr<OdOsNotification> OnDemandOsClientGrpcFactory::create(
78 : const shared_model::interface::Peer &to) {
79 0 : return std::make_unique<OnDemandOsClientGrpc>(
80 0 : network::createClient<proto::OnDemandOrdering>(to.address()),
81 0 : async_call_,
82 : time_provider_,
83 0 : proposal_request_timeout_);
84 0 : }
|