LCOV - code coverage report
Current view: top level - irohad/ordering/impl - ordering_gate_impl.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 49 54 90.7 %
Date: 2018-12-05 17:11:35 Functions: 22 23 95.7 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. 2017 All Rights Reserved.
       3             :  * http://soramitsu.co.jp
       4             :  *
       5             :  * Licensed under the Apache License, Version 2.0 (the "License");
       6             :  * you may not use this file except in compliance with the License.
       7             :  * You may obtain a copy of the License at
       8             :  *
       9             :  *        http://www.apache.org/licenses/LICENSE-2.0
      10             :  *
      11             :  * Unless required by applicable law or agreed to in writing, software
      12             :  * distributed under the License is distributed on an "AS IS" BASIS,
      13             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      14             :  * See the License for the specific language governing permissions and
      15             :  * limitations under the License.
      16             :  */
      17             : 
      18             : #include "ordering/impl/ordering_gate_impl.hpp"
      19             : 
      20             : #include <tuple>
      21             : #include <utility>
      22             : 
      23             : #include "interfaces/iroha_internal/block.hpp"
      24             : #include "interfaces/iroha_internal/proposal.hpp"
      25             : #include "interfaces/iroha_internal/transaction_batch.hpp"
      26             : 
      27             : namespace iroha {
      28             :   namespace ordering {
      29             : 
      30             :     bool ProposalComparator::operator()(
      31             :         const std::shared_ptr<shared_model::interface::Proposal> &lhs,
      32             :         const std::shared_ptr<shared_model::interface::Proposal> &rhs) const {
      33          22 :       return lhs->height() > rhs->height();
      34             :     }
      35             : 
      36             :     OrderingGateImpl::OrderingGateImpl(
      37             :         std::shared_ptr<iroha::network::OrderingGateTransport> transport,
      38             :         shared_model::interface::types::HeightType initial_height,
      39             :         bool run_async)
      40         252 :         : transport_(std::move(transport)),
      41         252 :           last_block_height_(initial_height),
      42         252 :           log_(logger::log("OrderingGate")),
      43         252 :           run_async_(run_async) {}
      44             : 
      45             :     void OrderingGateImpl::propagateBatch(
      46             :         std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
      47         718 :       if (batch->transactions().empty()) {
      48           0 :         log_->warn("trying to propagate empty batch");
      49           0 :         return;
      50             :       }
      51         718 :       log_->info("propagate batch, account_id: {}",
      52         718 :                  batch->transactions().front()->creatorAccountId());
      53             : 
      54         718 :       transport_->propagateBatch(batch);
      55         718 :     }
      56             : 
      57             :     rxcpp::observable<std::shared_ptr<shared_model::interface::Proposal>>
      58             :     OrderingGateImpl::on_proposal() {
      59         744 :       return proposals_.get_observable();
      60           0 :     }
      61             : 
      62             :     void OrderingGateImpl::setPcs(
      63             :         const iroha::network::PeerCommunicationService &pcs) {
      64         252 :       log_->info("setPcs");
      65             : 
      66             :       /// observable which contains heights of the top committed blocks
      67             :       auto top_block_height =
      68         252 :           pcs.on_commit()
      69         252 :               .transform(
      70             :                   [this](const synchronizer::SynchronizationEvent &sync_event) {
      71         712 :                     sync_event.synced_blocks.subscribe(
      72             :                         // take height of next block
      73             :                         [this](std::shared_ptr<shared_model::interface::Block>
      74             :                                    block_ptr) {
      75         712 :                           last_block_height_ = block_ptr->height();
      76         712 :                         });
      77         712 :                     return last_block_height_;
      78             :                   })
      79         252 :               .start_with(last_block_height_);
      80             : 
      81             :       /// merge_strategy - observable with another source of block heights
      82             :       auto subscribe = [&](auto merge_strategy) {
      83         245 :         pcs_subscriber_ = merge_strategy(net_proposals_.get_observable())
      84             :                               .subscribe([this](const auto &t) {
      85             :                                 // t is zip of two observables, there is
      86             :                                 // intentionally ignored first value (with stub
      87             :                                 // values) because it is required only for
      88             :                                 // synchronization
      89        1413 :                                 this->tryNextRound(std::get<1>(t));
      90        1413 :                               });
      91         245 :       };
      92             : 
      93         252 :       if (run_async_) {
      94             :         subscribe([&top_block_height](auto observable) {
      95         245 :           return observable.combine_latest(rxcpp::synchronize_new_thread(),
      96         245 :                                            top_block_height);
      97           0 :         });
      98         245 :       } else {
      99             :         subscribe([&top_block_height](auto observable) {
     100           7 :           return observable.combine_latest(top_block_height);
     101           0 :         });
     102             :       }
     103         252 :     }
     104             : 
     105             :     void OrderingGateImpl::onProposal(
     106             :         std::shared_ptr<shared_model::interface::Proposal> proposal) {
     107         730 :       log_->info("Received new proposal, height: {}", proposal->height());
     108         730 :       proposal_queue_.push(std::move(proposal));
     109         730 :       std::lock_guard<std::mutex> lock(proposal_mutex_);
     110             :       // intentionally pass stub value
     111         730 :       net_proposals_.get_subscriber().on_next(0);
     112             :     }
     113             : 
     114             :     void OrderingGateImpl::tryNextRound(
     115             :         shared_model::interface::types::HeightType last_block_height) {
     116        1441 :       log_->debug("TryNextRound");
     117        1441 :       std::shared_ptr<shared_model::interface::Proposal> next_proposal;
     118        2170 :       while (proposal_queue_.try_pop(next_proposal)) {
     119             :         // check for old proposal
     120         742 :         if (next_proposal->height() < last_block_height + 1) {
     121           7 :           log_->debug("Old proposal, discarding");
     122           7 :           continue;
     123             :         }
     124             :         // check for new proposal
     125         735 :         if (next_proposal->height() > last_block_height + 1) {
     126          13 :           log_->debug("Proposal newer than last block, keeping in queue");
     127          13 :           proposal_queue_.push(next_proposal);
     128          13 :           break;
     129             :         }
     130         722 :         log_->info("Pass the proposal to pipeline height {}",
     131         722 :                    next_proposal->height());
     132         722 :         proposals_.get_subscriber().on_next(next_proposal);
     133             :       }
     134        1441 :     }
     135             : 
     136             :     OrderingGateImpl::~OrderingGateImpl() {
     137         252 :       pcs_subscriber_.unsubscribe();
     138         252 :     }
     139             : 
     140             :   }  // namespace ordering
     141             : }  // namespace iroha

Generated by: LCOV version 1.13