LCOV - code coverage report
Current view: top level - irohad/multi_sig_transactions/impl - mst_processor_impl.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 59 62 95.2 %
Date: 2018-12-05 17:11:35 Functions: 21 22 95.5 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include <utility>
       7             : 
       8             : #include "multi_sig_transactions/mst_processor_impl.hpp"
       9             : 
      10             : namespace iroha {
      11             : 
      12             :   FairMstProcessor::FairMstProcessor(
      13             :       std::shared_ptr<iroha::network::MstTransport> transport,
      14             :       std::shared_ptr<MstStorage> storage,
      15             :       std::shared_ptr<PropagationStrategy> strategy,
      16             :       std::shared_ptr<MstTimeProvider> time_provider)
      17         253 :       : MstProcessor(),
      18         253 :         transport_(std::move(transport)),
      19         253 :         storage_(std::move(storage)),
      20         253 :         strategy_(std::move(strategy)),
      21         253 :         time_provider_(std::move(time_provider)),
      22         253 :         propagation_subscriber_(strategy_->emitter().subscribe(
      23             :             [this](auto data) { this->onPropagate(data); })) {
      24         253 :     log_ = logger::log("FairMstProcessor");
      25         253 :   }
      26             : 
      27             :   FairMstProcessor::~FairMstProcessor() {
      28         253 :     propagation_subscriber_.unsubscribe();
      29         253 :   }
      30             : 
      31             :   // -------------------------| MstProcessor override |-------------------------
      32             : 
      33             :   auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch)
      34             :       -> decltype(propagateBatch(batch)) {
      35          21 :     auto state_update = storage_->updateOwnState(batch);
      36          21 :     completedBatchesNotify(*state_update.completed_state_);
      37          21 :     updatedBatchesNotify(*state_update.updated_state_);
      38          21 :     expiredBatchesNotify(
      39          21 :         storage_->getExpiredTransactions(time_provider_->getCurrentTime()));
      40          21 :   }
      41             : 
      42             :   auto FairMstProcessor::onStateUpdateImpl() const
      43             :       -> decltype(onStateUpdate()) {
      44         496 :     return state_subject_.get_observable();
      45           0 :   }
      46             : 
      47             :   auto FairMstProcessor::onPreparedBatchesImpl() const
      48             :       -> decltype(onPreparedBatches()) {
      49         496 :     return batches_subject_.get_observable();
      50           0 :   }
      51             : 
      52             :   auto FairMstProcessor::onExpiredBatchesImpl() const
      53             :       -> decltype(onExpiredBatches()) {
      54         496 :     return expired_subject_.get_observable();
      55           0 :   }
      56             : 
      57             :   // TODO [IR-1687] Akvinikym 10.09.18: three methods below should be one
      58             :   void FairMstProcessor::completedBatchesNotify(ConstRefState state) const {
      59          22 :     if (not state.isEmpty()) {
      60           4 :       auto completed_batches = state.getBatches();
      61           4 :       std::for_each(completed_batches.begin(),
      62           4 :                     completed_batches.end(),
      63             :                     [this](const auto &batch) {
      64           4 :                       batches_subject_.get_subscriber().on_next(batch);
      65           4 :                     });
      66           4 :     }
      67          22 :   }
      68             : 
      69             :   void FairMstProcessor::updatedBatchesNotify(ConstRefState state) const {
      70          22 :     if (not state.isEmpty()) {
      71          17 :       state_subject_.get_subscriber().on_next(
      72          17 :           std::make_shared<MstState>(state));
      73          17 :     }
      74          22 :   }
      75             : 
      76             :   void FairMstProcessor::expiredBatchesNotify(ConstRefState state) const {
      77          22 :     if (not state.isEmpty()) {
      78           1 :       auto expired_batches = state.getBatches();
      79           1 :       std::for_each(expired_batches.begin(),
      80           1 :                     expired_batches.end(),
      81             :                     [this](const auto &batch) {
      82             :                       expired_subject_.get_subscriber().on_next(batch);
      83           1 :                     });
      84           1 :     }
      85          22 :   }
      86             : 
      87             :   // -------------------| MstTransportNotification override |-------------------
      88             : 
      89             :   void FairMstProcessor::onNewState(
      90             :       const shared_model::crypto::PublicKey &from,
      91             :       ConstRefState new_state) {
      92             :     log_->info("Applying new state");
      93           1 :     auto current_time = time_provider_->getCurrentTime();
      94             : 
      95           1 :     auto state_update = storage_->apply(from, new_state);
      96             : 
      97             :     // updated batches
      98           1 :     updatedBatchesNotify(*state_update.updated_state_);
      99           1 :     log_->info("New batches size: {}",
     100           1 :                state_update.updated_state_->getBatches().size());
     101             : 
     102             :     // completed batches
     103           1 :     completedBatchesNotify(*state_update.completed_state_);
     104             : 
     105             :     // expired batches
     106           1 :     expiredBatchesNotify(storage_->getDiffState(from, current_time));
     107           1 :   }
     108             : 
     109             :   // -----------------------------| private api |-----------------------------
     110             : 
     111             :   void FairMstProcessor::onPropagate(
     112             :       const PropagationStrategy::PropagationData &data) {
     113           9 :     auto current_time = time_provider_->getCurrentTime();
     114           9 :     auto size = data.size();
     115           9 :     std::for_each(data.begin(),
     116           9 :                   data.end(),
     117             :                   [this, &current_time, size](const auto &dst_peer) {
     118          17 :                     auto diff = storage_->getDiffState(dst_peer->pubkey(),
     119          17 :                                                        current_time);
     120          17 :                     if (not diff.isEmpty()) {
     121           4 :                       log_->info("Propagate new data[{}]", size);
     122           4 :                       transport_->sendState(*dst_peer, diff);
     123           4 :                     }
     124          17 :                   });
     125           9 :   }
     126             : 
     127             : }  // namespace iroha

Generated by: LCOV version 1.13