Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. 2017 All Rights Reserved.
3 : * http://soramitsu.co.jp
4 : *
5 : * Licensed under the Apache License, Version 2.0 (the "License");
6 : * you may not use this file except in compliance with the License.
7 : * You may obtain a copy of the License at
8 : *
9 : * http://www.apache.org/licenses/LICENSE-2.0
10 : *
11 : * Unless required by applicable law or agreed to in writing, software
12 : * distributed under the License is distributed on an "AS IS" BASIS,
13 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : * See the License for the specific language governing permissions and
15 : * limitations under the License.
16 : */
17 :
18 : #include "ordering/impl/ordering_gate_impl.hpp"
19 :
20 : #include <tuple>
21 : #include <utility>
22 :
23 : #include "interfaces/iroha_internal/block.hpp"
24 : #include "interfaces/iroha_internal/proposal.hpp"
25 : #include "interfaces/iroha_internal/transaction_batch.hpp"
26 :
27 : namespace iroha {
28 : namespace ordering {
29 :
30 : bool ProposalComparator::operator()(
31 : const std::shared_ptr<shared_model::interface::Proposal> &lhs,
32 : const std::shared_ptr<shared_model::interface::Proposal> &rhs) const {
33 22 : return lhs->height() > rhs->height();
34 : }
35 :
36 : OrderingGateImpl::OrderingGateImpl(
37 : std::shared_ptr<iroha::network::OrderingGateTransport> transport,
38 : shared_model::interface::types::HeightType initial_height,
39 : bool run_async)
40 252 : : transport_(std::move(transport)),
41 252 : last_block_height_(initial_height),
42 252 : log_(logger::log("OrderingGate")),
43 252 : run_async_(run_async) {}
44 :
45 : void OrderingGateImpl::propagateBatch(
46 : std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
47 718 : if (batch->transactions().empty()) {
48 0 : log_->warn("trying to propagate empty batch");
49 0 : return;
50 : }
51 718 : log_->info("propagate batch, account_id: {}",
52 718 : batch->transactions().front()->creatorAccountId());
53 :
54 718 : transport_->propagateBatch(batch);
55 718 : }
56 :
57 : rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
58 : OrderingGateImpl::on_proposal() {
59 744 : return proposals_.get_observable();
60 0 : }
61 :
62 : void OrderingGateImpl::setPcs(
63 : const iroha::network::PeerCommunicationService &pcs) {
64 252 : log_->info("setPcs");
65 :
66 : /// observable which contains heights of the top committed blocks
67 : auto top_block_height =
68 252 : pcs.on_commit()
69 252 : .transform(
70 : [this](const synchronizer::SynchronizationEvent &sync_event) {
71 712 : sync_event.synced_blocks.subscribe(
72 : // take height of next block
73 : [this](std::shared_ptr<shared_model::interface::Block>
74 : block_ptr) {
75 712 : last_block_height_ = block_ptr->height();
76 712 : });
77 712 : return last_block_height_;
78 : })
79 252 : .start_with(last_block_height_);
80 :
81 : /// merge_strategy - observable with another source of block heights
82 : auto subscribe = [&](auto merge_strategy) {
83 245 : pcs_subscriber_ = merge_strategy(net_proposals_.get_observable())
84 : .subscribe([this](const auto &t) {
85 : // t is zip of two observables, there is
86 : // intentionally ignored first value (with stub
87 : // values) because it is required only for
88 : // synchronization
89 1413 : this->tryNextRound(std::get<1>(t));
90 1413 : });
91 245 : };
92 :
93 252 : if (run_async_) {
94 : subscribe([&top_block_height](auto observable) {
95 245 : return observable.combine_latest(rxcpp::synchronize_new_thread(),
96 245 : top_block_height);
97 0 : });
98 245 : } else {
99 : subscribe([&top_block_height](auto observable) {
100 7 : return observable.combine_latest(top_block_height);
101 0 : });
102 : }
103 252 : }
104 :
105 : void OrderingGateImpl::onProposal(
106 : std::shared_ptr<shared_model::interface::Proposal> proposal) {
107 730 : log_->info("Received new proposal, height: {}", proposal->height());
108 730 : proposal_queue_.push(std::move(proposal));
109 730 : std::lock_guard<std::mutex> lock(proposal_mutex_);
110 : // intentionally pass stub value
111 730 : net_proposals_.get_subscriber().on_next(0);
112 : }
113 :
114 : void OrderingGateImpl::tryNextRound(
115 : shared_model::interface::types::HeightType last_block_height) {
116 1441 : log_->debug("TryNextRound");
117 1441 : std::shared_ptr<shared_model::interface::Proposal> next_proposal;
118 2170 : while (proposal_queue_.try_pop(next_proposal)) {
119 : // check for old proposal
120 742 : if (next_proposal->height() < last_block_height + 1) {
121 7 : log_->debug("Old proposal, discarding");
122 7 : continue;
123 : }
124 : // check for new proposal
125 735 : if (next_proposal->height() > last_block_height + 1) {
126 13 : log_->debug("Proposal newer than last block, keeping in queue");
127 13 : proposal_queue_.push(next_proposal);
128 13 : break;
129 : }
130 722 : log_->info("Pass the proposal to pipeline height {}",
131 722 : next_proposal->height());
132 722 : proposals_.get_subscriber().on_next(next_proposal);
133 : }
134 1441 : }
135 :
136 : OrderingGateImpl::~OrderingGateImpl() {
137 252 : pcs_subscriber_.unsubscribe();
138 252 : }
139 :
140 : } // namespace ordering
141 : } // namespace iroha
|