Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering/impl/single_peer_ordering_service.hpp"
7 :
8 : #include <algorithm>
9 : #include <iterator>
10 :
11 : #include <boost/range/adaptor/indirected.hpp>
12 :
13 : #include "ametsuchi/ordering_service_persistent_state.hpp"
14 : #include "common/bind.hpp"
15 : #include "datetime/time.hpp"
16 : #include "interfaces/common_objects/peer.hpp"
17 : #include "interfaces/iroha_internal/transaction_batch_impl.hpp"
18 : #include "network/ordering_service_transport.hpp"
19 :
20 : namespace iroha {
21 : namespace ordering {
22 : SinglePeerOrderingService::SinglePeerOrderingService(
23 : std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory,
24 : size_t max_size,
25 : rxcpp::observable<TimeoutType> proposal_timeout,
26 : std::shared_ptr<network::OrderingServiceTransport> transport,
27 : std::shared_ptr<ametsuchi::OsPersistentStateFactory> persistent_state,
28 : std::unique_ptr<shared_model::interface::ProposalFactory> factory,
29 : bool is_async)
30 251 : : peer_query_factory_(peer_query_factory),
31 251 : max_size_(max_size),
32 251 : current_size_(0),
33 251 : transport_(transport),
34 251 : persistent_state_(persistent_state),
35 251 : factory_(std::move(factory)),
36 251 : proposal_height_(persistent_state_->createOsPersistentState() |
37 : [](const auto &state) {
38 251 : return state->loadProposalHeight().value();
39 : }),
40 251 : log_(logger::log("OrderingServiceImpl")) {
41 : // restore state of ordering service from persistent storage
42 251 : rxcpp::observable<ProposalEvent> timer =
43 : proposal_timeout.map([](auto) { return ProposalEvent::kTimerEvent; });
44 :
45 : auto subscribe = [&](auto merge_strategy) {
46 245 : handle_ = merge_strategy(rxcpp::observable<>::from(
47 245 : timer, transactions_.get_observable()))
48 : .subscribe([this](auto &&v) {
49 : auto check_queue = [&] {
50 2027 : switch (v) {
51 : case ProposalEvent::kTimerEvent:
52 245 : return not queue_.empty();
53 : case ProposalEvent::kBatchEvent:
54 2023 : return current_size_.load() >= max_size_;
55 : default:
56 0 : BOOST_ASSERT_MSG(false, "Unknown value");
57 : }
58 2027 : };
59 2027 : if (check_queue()) {
60 2006 : this->generateProposal();
61 2006 : }
62 2027 : });
63 245 : };
64 :
65 251 : if (is_async) {
66 : subscribe([](auto observable) {
67 245 : return observable.merge(rxcpp::synchronize_new_thread());
68 0 : });
69 245 : } else {
70 : subscribe([](auto observable) { return observable.merge(); });
71 : }
72 251 : }
73 :
74 : void SinglePeerOrderingService::onBatch(
75 : std::unique_ptr<shared_model::interface::TransactionBatch> batch) {
76 2731 : std::shared_lock<std::shared_timed_mutex> batch_prop_lock(
77 2731 : batch_prop_mutex_);
78 :
79 2734 : current_size_.fetch_add(batch->transactions().size());
80 2734 : queue_.push(std::move(batch));
81 2697 : log_->info("Queue size is {}", current_size_.load());
82 :
83 2740 : batch_prop_lock.unlock();
84 :
85 2738 : std::lock_guard<std::mutex> event_lock(event_mutex_);
86 2741 : transactions_.get_subscriber().on_next(ProposalEvent::kBatchEvent);
87 2741 : }
88 :
89 : void SinglePeerOrderingService::generateProposal() {
90 2716 : std::lock_guard<std::shared_timed_mutex> lock(batch_prop_mutex_);
91 2716 : log_->info("Start proposal generation");
92 : std::vector<std::shared_ptr<shared_model::interface::Transaction>> txs;
93 5457 : for (std::unique_ptr<shared_model::interface::TransactionBatch> batch;
94 5457 : txs.size() < max_size_ and queue_.try_pop(batch);) {
95 2741 : auto batch_size = batch->transactions().size();
96 : // TODO 29.08.2018 andrei IR-1667 Timestamp validation during proposal
97 : // generation
98 2741 : txs.insert(std::end(txs),
99 2741 : std::make_move_iterator(std::begin(batch->transactions())),
100 2741 : std::make_move_iterator(std::end(batch->transactions())));
101 2741 : current_size_ -= batch_size;
102 : }
103 :
104 2716 : auto tx_range = txs | boost::adaptors::indirected;
105 2716 : auto proposal = factory_->createProposal(
106 2716 : proposal_height_, iroha::time::now(), tx_range);
107 :
108 2716 : proposal.match(
109 : [this](expected::Value<
110 : std::unique_ptr<shared_model::interface::Proposal>> &v) {
111 : // Save proposal height to the persistent storage.
112 : // In case of restart it reloads state.
113 2716 : if (persistent_state_->createOsPersistentState() |
114 : [this](const auto &state) {
115 2715 : return state->saveProposalHeight(proposal_height_ + 1);
116 : }) {
117 714 : publishProposal(std::move(v.value));
118 714 : proposal_height_++;
119 714 : } else {
120 : // TODO(@l4l) 23/03/18: publish proposal independent of psql
121 : // status IR-1162
122 2002 : log_->warn(
123 : "Proposal height cannot be saved. Skipping proposal publish");
124 : }
125 2716 : },
126 : [this](expected::Error<std::string> &e) {
127 0 : log_->warn("Failed to initialize proposal: {}", e.error);
128 0 : });
129 2716 : }
130 :
131 : void SinglePeerOrderingService::publishProposal(
132 : std::unique_ptr<shared_model::interface::Proposal> proposal) {
133 714 : auto peers = peer_query_factory_->createPeerQuery() |
134 : [](const auto &query) { return query->getLedgerPeers(); };
135 714 : if (peers) {
136 714 : std::vector<std::string> addresses;
137 714 : std::transform(peers->begin(),
138 714 : peers->end(),
139 714 : std::back_inserter(addresses),
140 : [](auto &p) { return p->address(); });
141 714 : transport_->publishProposal(std::move(proposal), addresses);
142 714 : } else {
143 0 : log_->error("Cannot get the peer list");
144 : }
145 714 : }
146 :
147 : SinglePeerOrderingService::~SinglePeerOrderingService() {
148 251 : handle_.unsubscribe();
149 251 : }
150 : } // namespace ordering
151 : } // namespace iroha
|