Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "main/impl/ordering_init.hpp"
7 : #include "ametsuchi/os_persistent_state_factory.hpp"
8 : #include "common/bind.hpp"
9 : #include "interfaces/common_objects/peer.hpp"
10 : #include "interfaces/common_objects/types.hpp"
11 : #include "interfaces/iroha_internal/block.hpp"
12 :
13 : namespace iroha {
14 : namespace network {
15 : auto OrderingInit::createGate(
16 : std::shared_ptr<OrderingGateTransport> transport,
17 : std::shared_ptr<ametsuchi::BlockQueryFactory> block_query_factory) {
18 245 : return block_query_factory->createBlockQuery() |
19 : [this, &transport](const auto &block_query) {
20 245 : return block_query->getTopBlock().match(
21 : [this, &transport](
22 : expected::Value<
23 : std::shared_ptr<shared_model::interface::Block>> &block)
24 : -> std::shared_ptr<OrderingGate> {
25 245 : const auto &height = block.value->height();
26 245 : auto gate = std::make_shared<ordering::OrderingGateImpl>(
27 245 : transport, height);
28 245 : log_->info("Creating Ordering Gate with initial height {}",
29 245 : height);
30 245 : transport->subscribe(gate);
31 245 : return gate;
32 245 : },
33 : [](expected::Error<std::string> &error)
34 : -> std::shared_ptr<OrderingGate> {
35 : // TODO 12.06.18 Akvinikym: handle the exception IR-1415
36 0 : throw std::runtime_error("Ordering Gate creation failed! "
37 0 : + error.error);
38 0 : });
39 0 : };
40 0 : }
41 :
42 : auto OrderingInit::createService(
43 : std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory,
44 : size_t max_size,
45 : std::chrono::milliseconds delay_milliseconds,
46 : std::shared_ptr<network::OrderingServiceTransport> transport,
47 : std::shared_ptr<ametsuchi::OsPersistentStateFactory> persistent_state) {
48 245 : auto factory = std::make_unique<shared_model::proto::ProtoProposalFactory<
49 : shared_model::validation::DefaultProposalValidator>>();
50 245 : return std::make_shared<ordering::SinglePeerOrderingService>(
51 : peer_query_factory,
52 : max_size,
53 245 : rxcpp::observable<>::interval(delay_milliseconds,
54 245 : rxcpp::observe_on_new_thread()),
55 : transport,
56 : persistent_state,
57 245 : std::move(factory));
58 245 : }
59 :
60 : std::shared_ptr<OrderingGate> OrderingInit::initOrderingGate(
61 : std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory,
62 : size_t max_size,
63 : std::chrono::milliseconds delay_milliseconds,
64 : std::shared_ptr<ametsuchi::OsPersistentStateFactory> persistent_state,
65 : std::shared_ptr<ametsuchi::BlockQueryFactory> block_query_factory,
66 : std::shared_ptr<shared_model::interface::TransactionBatchFactory>
67 : transaction_batch_factory,
68 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
69 : async_call) {
70 245 : auto query = peer_query_factory->createPeerQuery();
71 245 : if (not query or not query.get()) {
72 0 : log_->error("Cannot get the peer query");
73 0 : }
74 245 : auto ledger_peers = query.get()->getLedgerPeers();
75 245 : if (not ledger_peers or ledger_peers.value().empty()) {
76 0 : log_->error(
77 : "Ledger don't have peers. Do you set correct genesis block?");
78 0 : }
79 245 : auto network_address = ledger_peers->front()->address();
80 245 : log_->info("Ordering gate is at {}", network_address);
81 245 : ordering_gate_transport =
82 : std::make_shared<iroha::ordering::OrderingGateTransportGrpc>(
83 : network_address, async_call);
84 :
85 245 : ordering_service_transport =
86 245 : std::make_shared<ordering::OrderingServiceTransportGrpc>(
87 245 : std::move(transaction_batch_factory), std::move(async_call));
88 245 : ordering_service = createService(peer_query_factory,
89 245 : max_size,
90 245 : delay_milliseconds,
91 245 : ordering_service_transport,
92 : persistent_state);
93 245 : ordering_service_transport->subscribe(ordering_service);
94 245 : ordering_gate = createGate(ordering_gate_transport, block_query_factory);
95 245 : return ordering_gate;
96 245 : }
97 : } // namespace network
98 : } // namespace iroha
|