Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering/impl/on_demand_ordering_service_impl.hpp"
7 :
8 : #include <unordered_set>
9 :
10 : #include <boost/range/adaptor/filtered.hpp>
11 : #include <boost/range/adaptor/indirected.hpp>
12 : #include <boost/range/adaptor/transformed.hpp>
13 : #include <boost/range/algorithm/for_each.hpp>
14 : #include <boost/range/size.hpp>
15 : #include "ametsuchi/tx_presence_cache.hpp"
16 : #include "ametsuchi/tx_presence_cache_utils.hpp"
17 : #include "common/visitor.hpp"
18 : #include "datetime/time.hpp"
19 : #include "interfaces/iroha_internal/proposal.hpp"
20 : #include "interfaces/iroha_internal/transaction_batch.hpp"
21 : #include "interfaces/transaction.hpp"
22 :
23 : using namespace iroha::ordering;
24 :
25 : /**
26 : * First round after successful committing block
27 : */
28 : const iroha::consensus::RejectRoundType kFirstRound = 1;
29 :
30 : OnDemandOrderingServiceImpl::OnDemandOrderingServiceImpl(
31 : size_t transaction_limit,
32 : std::unique_ptr<shared_model::interface::UnsafeProposalFactory>
33 : proposal_factory,
34 : std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
35 : size_t number_of_proposals,
36 : const consensus::Round &initial_round)
37 10 : : transaction_limit_(transaction_limit),
38 10 : number_of_proposals_(number_of_proposals),
39 10 : proposal_factory_(std::move(proposal_factory)),
40 10 : tx_cache_(std::move(tx_cache)),
41 10 : log_(logger::log("OnDemandOrderingServiceImpl")) {
42 10 : onCollaborationOutcome(initial_round);
43 10 : }
44 :
45 : // -------------------------| OnDemandOrderingService |-------------------------
46 :
47 : void OnDemandOrderingServiceImpl::onCollaborationOutcome(
48 : consensus::Round round) {
49 37 : log_->info("onCollaborationOutcome => round[{}, {}]",
50 37 : round.block_round,
51 37 : round.reject_round);
52 : // exclusive write lock
53 37 : std::lock_guard<std::shared_timed_mutex> guard(lock_);
54 37 : log_->info("onCollaborationOutcome => write lock is acquired");
55 :
56 37 : packNextProposals(round);
57 37 : tryErase();
58 37 : }
59 :
60 : // ----------------------------| OdOsNotification |-----------------------------
61 :
62 : void OnDemandOrderingServiceImpl::onBatches(consensus::Round round,
63 : CollectionType batches) {
64 : // read lock
65 26 : std::shared_lock<std::shared_timed_mutex> guard(lock_);
66 26 : log_->info("onBatches => collection size = {}, round[{}, {}]",
67 26 : batches.size(),
68 26 : round.block_round,
69 26 : round.reject_round);
70 :
71 : auto unprocessed_batches =
72 : boost::adaptors::filter(batches, [this](const auto &batch) {
73 66 : log_->info("check batch {} for already processed transactions",
74 66 : batch->reducedHash().hex());
75 66 : return not this->batchAlreadyProcessed(*batch);
76 : });
77 26 : auto it = current_proposals_.find(round);
78 26 : if (it != current_proposals_.end()) {
79 25 : std::for_each(unprocessed_batches.begin(),
80 25 : unprocessed_batches.end(),
81 : [&it](auto &obj) { it->second.push(std::move(obj)); });
82 : log_->info("onTransactions => collection is inserted");
83 25 : }
84 26 : }
85 :
86 : boost::optional<OnDemandOrderingServiceImpl::ProposalType>
87 : OnDemandOrderingServiceImpl::onRequestProposal(consensus::Round round) {
88 : // read lock
89 29 : std::shared_lock<std::shared_timed_mutex> guard(lock_);
90 29 : auto proposal = proposal_map_.find(round);
91 29 : if (proposal != proposal_map_.end()) {
92 : return clone(*proposal->second);
93 : } else {
94 13 : return boost::none;
95 : }
96 29 : }
97 :
98 : // ---------------------------------| Private |---------------------------------
99 :
100 : void OnDemandOrderingServiceImpl::packNextProposals(
101 : const consensus::Round &round) {
102 : auto close_round = [this](consensus::Round round) {
103 64 : auto it = current_proposals_.find(round);
104 64 : if (it != current_proposals_.end()) {
105 44 : if (not it->second.empty()) {
106 25 : proposal_map_.emplace(round, emitProposal(round));
107 25 : log_->info("packNextProposal: data has been fetched for round[{}, {}]",
108 25 : round.block_round,
109 25 : round.reject_round);
110 25 : round_queue_.push(round);
111 25 : }
112 : current_proposals_.erase(it);
113 44 : }
114 64 : };
115 :
116 : /*
117 : * The possible cases can be visualised as a diagram, where:
118 : * o - current round, x - next round, v - target round
119 : *
120 : * 0 1 2
121 : * 0 o x v
122 : * 1 x v .
123 : * 2 v . .
124 : *
125 : * Reject case:
126 : *
127 : * 0 1 2 3
128 : * 0 . o x v
129 : * 1 x v . .
130 : * 2 v . . .
131 : *
132 : * (0,1) - current round. Round (0,2) is closed for transactions.
133 : * Round (0,3) is now receiving transactions.
134 : * Rounds (1,) and (2,) do not change.
135 : *
136 : * Commit case:
137 : *
138 : * 0 1 2
139 : * 0 . . .
140 : * 1 o x v
141 : * 2 x v .
142 : * 3 v . .
143 : *
144 : * (1,0) - current round. The diagram is similar to the initial case.
145 : */
146 :
147 : // close next reject round
148 37 : close_round({round.block_round, round.reject_round + 1});
149 :
150 37 : if (round.reject_round == kFirstRound) {
151 : // new block round
152 27 : close_round({round.block_round + 1, round.reject_round});
153 :
154 : // remove current queues
155 27 : current_proposals_.clear();
156 : // initialize the 3 diagonal rounds from the commit case diagram
157 108 : for (uint32_t i = 0; i <= 2; ++i) {
158 81 : current_proposals_[{round.block_round + i, round.reject_round + 2 - i}];
159 81 : }
160 27 : } else {
161 : // new reject round
162 10 : current_proposals_[{round.block_round, round.reject_round + 2}];
163 : }
164 37 : }
165 :
166 : OnDemandOrderingServiceImpl::ProposalType
167 : OnDemandOrderingServiceImpl::emitProposal(const consensus::Round &round) {
168 25 : log_->info("Mutable proposal generation, round[{}, {}]",
169 25 : round.block_round,
170 25 : round.reject_round);
171 :
172 25 : TransactionBatchType batch;
173 25 : std::vector<std::shared_ptr<shared_model::interface::Transaction>> collection;
174 25 : std::unordered_set<std::string> inserted;
175 :
176 : // outer method should guarantee availability of at least one transaction in
177 : // queue, also, code shouldn't fetch all transactions from queue. The rest
178 : // will be lost.
179 25 : auto ¤t_proposal = current_proposals_[round];
180 70 : while (current_proposal.try_pop(batch)
181 70 : and collection.size() < transaction_limit_
182 46 : and inserted.insert(batch->reducedHash().hex()).second) {
183 45 : collection.insert(
184 45 : std::end(collection),
185 45 : std::make_move_iterator(std::begin(batch->transactions())),
186 45 : std::make_move_iterator(std::end(batch->transactions())));
187 : }
188 25 : log_->info("Number of transactions in proposal = {}", collection.size());
189 :
190 25 : auto txs = collection | boost::adaptors::indirected;
191 25 : return proposal_factory_->unsafeCreateProposal(
192 25 : round.block_round, iroha::time::now(), txs);
193 25 : }
194 :
195 : void OnDemandOrderingServiceImpl::tryErase() {
196 37 : if (round_queue_.size() >= number_of_proposals_) {
197 12 : auto &round = round_queue_.front();
198 12 : proposal_map_.erase(round);
199 12 : log_->info("tryErase: erased round[{}, {}]",
200 12 : round.block_round,
201 12 : round.reject_round);
202 12 : round_queue_.pop();
203 12 : }
204 37 : }
205 :
206 : bool OnDemandOrderingServiceImpl::batchAlreadyProcessed(
207 : const shared_model::interface::TransactionBatch &batch) {
208 66 : auto tx_statuses = tx_cache_->check(batch);
209 66 : if (not tx_statuses) {
210 : // TODO andrei 30.11.18 IR-51 Handle database error
211 0 : log_->warn("Check tx presence database error. Batch: {}", batch.toString());
212 0 : return true;
213 : }
214 : // if any transaction is commited or rejected, batch was already processed
215 : // Note: any_of returns false for empty sequence
216 66 : return std::any_of(
217 : tx_statuses->begin(), tx_statuses->end(), [this](const auto &tx_status) {
218 66 : if (iroha::ametsuchi::isAlreadyProcessed(tx_status)) {
219 2 : log_->warn("Duplicate transaction: {}",
220 2 : iroha::ametsuchi::getHash(tx_status).hex());
221 2 : return true;
222 : }
223 64 : return false;
224 66 : });
225 66 : }
|