Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering/impl/on_demand_ordering_gate.hpp"
7 :
8 : #include <boost/range/adaptor/filtered.hpp>
9 : #include <boost/range/empty.hpp>
10 : #include "ametsuchi/tx_presence_cache.hpp"
11 : #include "common/visitor.hpp"
12 :
13 : using namespace iroha;
14 : using namespace iroha::ordering;
15 :
16 : OnDemandOrderingGate::OnDemandOrderingGate(
17 : std::shared_ptr<OnDemandOrderingService> ordering_service,
18 : std::shared_ptr<transport::OdOsNotification> network_client,
19 : rxcpp::observable<BlockRoundEventType> events,
20 : std::shared_ptr<cache::OrderingGateCache> cache,
21 : std::unique_ptr<shared_model::interface::UnsafeProposalFactory> factory,
22 : std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache,
23 : consensus::Round initial_round)
24 8 : : ordering_service_(std::move(ordering_service)),
25 8 : network_client_(std::move(network_client)),
26 : events_subscription_(events.subscribe([this](auto event) {
27 : // exclusive lock
28 7 : std::lock_guard<std::shared_timed_mutex> lock(mutex_);
29 :
30 7 : visit_in_place(event,
31 : [this](const BlockEvent &block_event) {
32 : // block committed, increment block round
33 5 : current_round_ = block_event.round;
34 5 : cache_->remove(block_event.hashes);
35 5 : },
36 : [this](const EmptyEvent &empty) {
37 : // no blocks committed, increment reject round
38 2 : current_round_ = {current_round_.block_round,
39 2 : current_round_.reject_round + 1};
40 2 : });
41 :
42 7 : auto batches = cache_->pop();
43 :
44 7 : cache_->addToBack(batches);
45 7 : network_client_->onBatches(current_round_,
46 7 : transport::OdOsNotification::CollectionType{
47 7 : batches.begin(), batches.end()});
48 :
49 : // notify our ordering service about new round
50 7 : ordering_service_->onCollaborationOutcome(current_round_);
51 :
52 : // request proposal for the current round
53 7 : auto proposal = network_client_->onRequestProposal(current_round_);
54 :
55 7 : auto final_proposal = this->processProposalRequest(std::move(proposal));
56 : // vote for the object received from the network
57 7 : proposal_notifier_.get_subscriber().on_next(std::move(final_proposal));
58 7 : })),
59 8 : cache_(std::move(cache)),
60 8 : proposal_factory_(std::move(factory)),
61 8 : tx_cache_(std::move(tx_cache)),
62 8 : current_round_(initial_round) {}
63 :
64 : void OnDemandOrderingGate::propagateBatch(
65 : std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
66 1 : std::shared_lock<std::shared_timed_mutex> lock(mutex_);
67 :
68 1 : cache_->addToBack({batch});
69 1 : network_client_->onBatches(
70 1 : current_round_, transport::OdOsNotification::CollectionType{batch});
71 1 : }
72 :
73 : rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
74 : OnDemandOrderingGate::on_proposal() {
75 5 : return proposal_notifier_.get_observable();
76 0 : }
77 :
78 : void OnDemandOrderingGate::setPcs(
79 : const iroha::network::PeerCommunicationService &pcs) {
80 0 : throw std::logic_error(
81 : "Method is deprecated. PCS observable should be set in ctor");
82 0 : }
83 :
84 : std::unique_ptr<shared_model::interface::Proposal>
85 : OnDemandOrderingGate::processProposalRequest(
86 : boost::optional<OnDemandOrderingService::ProposalType> &&proposal) const {
87 7 : if (not proposal) {
88 4 : return proposal_factory_->unsafeCreateProposal(
89 4 : current_round_.block_round, current_round_.reject_round, {});
90 : }
91 : // no need to check empty proposal
92 3 : if (boost::empty(proposal.value()->transactions())) {
93 2 : return std::move(proposal.value());
94 : }
95 1 : return removeReplays(std::move(**std::move(proposal)));
96 7 : }
97 :
98 : std::unique_ptr<shared_model::interface::Proposal>
99 : OnDemandOrderingGate::removeReplays(
100 : shared_model::interface::Proposal &&proposal) const {
101 : auto tx_is_not_processed = [this](const auto &tx) {
102 1 : auto tx_result = tx_cache_->check(tx.hash());
103 1 : if (not tx_result) {
104 : // TODO andrei 30.11.18 IR-51 Handle database error
105 0 : return false;
106 : }
107 1 : return iroha::visit_in_place(
108 1 : *tx_result,
109 : [](const ametsuchi::tx_cache_status_responses::Missing &) {
110 0 : return true;
111 : },
112 : [](const auto &status) {
113 : // TODO nickaleks 21.11.18: IR-1887 log replayed transactions
114 : // when log is added
115 1 : return false;
116 : });
117 1 : };
118 : auto unprocessed_txs =
119 1 : boost::adaptors::filter(proposal.transactions(), tx_is_not_processed);
120 :
121 1 : return proposal_factory_->unsafeCreateProposal(
122 1 : proposal.height(), proposal.createdTime(), unprocessed_txs);
123 1 : }
|