Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering_gate_transport_grpc.hpp"
7 :
8 : #include "backend/protobuf/transaction.hpp"
9 : #include "endpoint.pb.h"
10 : #include "interfaces/common_objects/types.hpp"
11 : #include "network/impl/grpc_channel_builder.hpp"
12 :
13 : using namespace iroha::ordering;
14 :
15 : grpc::Status OrderingGateTransportGrpc::onProposal(
16 : ::grpc::ServerContext *context,
17 : const iroha::protocol::Proposal *request,
18 : ::google::protobuf::Empty *response) {
19 710 : async_call_->log_->info("receive proposal");
20 :
21 710 : auto proposal_res = factory_->createProposal(*request);
22 710 : proposal_res.match(
23 : [this](iroha::expected::Value<
24 : std::unique_ptr<shared_model::interface::Proposal>> &v) {
25 710 : async_call_->log_->info("transactions in proposal: {}",
26 710 : v.value->transactions().size());
27 :
28 710 : if (not subscriber_.expired()) {
29 710 : subscriber_.lock()->onProposal(std::move(v.value));
30 710 : } else {
31 0 : async_call_->log_->error("(onProposal) No subscriber");
32 : }
33 710 : },
34 : [this](const iroha::expected::Error<std::string> &e) {
35 0 : async_call_->log_->error("Received invalid proposal: {}", e.error);
36 0 : });
37 :
38 710 : return grpc::Status::OK;
39 710 : }
40 :
41 : OrderingGateTransportGrpc::OrderingGateTransportGrpc(
42 : const std::string &server_address,
43 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
44 : async_call)
45 246 : : client_(network::createClient<proto::OrderingServiceTransportGrpc>(
46 246 : server_address)),
47 246 : async_call_(std::move(async_call)),
48 246 : factory_(std::make_unique<shared_model::proto::ProtoProposalFactory<
49 246 : shared_model::validation::DefaultProposalValidator>>()) {}
50 :
51 : void OrderingGateTransportGrpc::propagateBatch(
52 : std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
53 718 : async_call_->log_->info("Propagate transaction batch (on transport)");
54 :
55 718 : iroha::protocol::TxList batch_transport;
56 1441 : for (const auto tx : batch->transactions()) {
57 723 : new (batch_transport.add_transactions()) iroha::protocol::Transaction(
58 723 : std::static_pointer_cast<shared_model::proto::Transaction>(tx)
59 723 : ->getTransport());
60 723 : }
61 : async_call_->Call([&](auto context, auto cq) {
62 718 : return client_->AsynconBatch(context, batch_transport, cq);
63 : });
64 718 : }
65 :
66 : void OrderingGateTransportGrpc::subscribe(
67 : std::shared_ptr<iroha::network::OrderingGateNotification> subscriber) {
68 246 : async_call_->log_->info("Subscribe");
69 246 : subscriber_ = subscriber;
70 246 : }
|