Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering/impl/ordering_service_transport_grpc.hpp"
7 :
8 : #include "backend/protobuf/proposal.hpp"
9 : #include "backend/protobuf/transaction.hpp"
10 : #include "interfaces/common_objects/transaction_sequence_common.hpp"
11 : #include "network/impl/grpc_channel_builder.hpp"
12 :
13 : using namespace iroha::ordering;
14 :
15 : void OrderingServiceTransportGrpc::subscribe(
16 : std::shared_ptr<iroha::network::OrderingServiceNotification> subscriber) {
17 245 : subscriber_ = subscriber;
18 245 : }
19 :
20 : grpc::Status OrderingServiceTransportGrpc::onBatch(
21 : ::grpc::ServerContext *context,
22 : const protocol::TxList *request,
23 : ::google::protobuf::Empty *response) {
24 718 : async_call_->log_->info("OrderingServiceTransportGrpc::onBatch");
25 718 : if (subscriber_.expired()) {
26 0 : async_call_->log_->error("No subscriber");
27 0 : } else {
28 : auto txs =
29 718 : std::vector<std::shared_ptr<shared_model::interface::Transaction>>(
30 718 : request->transactions_size());
31 718 : std::transform(
32 718 : std::begin(request->transactions()),
33 718 : std::end(request->transactions()),
34 718 : std::begin(txs),
35 : [](const auto &tx) {
36 723 : return std::make_shared<shared_model::proto::Transaction>(tx);
37 : });
38 :
39 : // TODO [IR-1730] Akvinikym 04.10.18: use transaction factory to stateless
40 : // validate transactions before wrapping them into batches
41 718 : auto batch_result = batch_factory_->createTransactionBatch(txs);
42 718 : batch_result.match(
43 : [this](iroha::expected::Value<std::unique_ptr<
44 : shared_model::interface::TransactionBatch>> &batch) {
45 718 : subscriber_.lock()->onBatch(std::move(batch.value));
46 718 : },
47 : [this](const iroha::expected::Error<std::string> &error) {
48 0 : async_call_->log_->error(
49 : "Could not create batch from received transaction list: {}",
50 0 : error.error);
51 0 : });
52 718 : }
53 718 : return ::grpc::Status::OK;
54 0 : }
55 :
56 : void OrderingServiceTransportGrpc::publishProposal(
57 : std::unique_ptr<shared_model::interface::Proposal> proposal,
58 : const std::vector<std::string> &peers) {
59 709 : async_call_->log_->info("OrderingServiceTransportGrpc::publishProposal");
60 : std::unordered_map<
61 : std::string,
62 : std::unique_ptr<proto::OrderingGateTransportGrpc::StubInterface>>
63 709 : peers_map;
64 1420 : for (const auto &peer : peers) {
65 711 : peers_map[peer] =
66 711 : network::createClient<proto::OrderingGateTransportGrpc>(peer);
67 : }
68 :
69 1420 : for (const auto &peer : peers_map) {
70 711 : auto proto = static_cast<shared_model::proto::Proposal *>(proposal.get());
71 711 : async_call_->log_->debug("Publishing proposal: '{}'",
72 711 : proto->getTransport().DebugString());
73 :
74 711 : auto transport = proto->getTransport();
75 : async_call_->Call([&](auto context, auto cq) {
76 711 : return peer.second->AsynconProposal(context, transport, cq);
77 : });
78 711 : }
79 709 : }
80 :
81 : OrderingServiceTransportGrpc::OrderingServiceTransportGrpc(
82 : std::shared_ptr<shared_model::interface::TransactionBatchFactory>
83 : transaction_batch_factory,
84 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
85 : async_call)
86 245 : : async_call_(std::move(async_call)),
87 245 : batch_factory_(std::move(transaction_batch_factory)) {}
|