LCOV - code coverage report
Current view: top level - irohad/synchronizer/impl - synchronizer_impl.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 0 67 0.0 %
Date: 2018-12-05 17:11:35 Functions: 0 14 0.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "synchronizer/impl/synchronizer_impl.hpp"
       7             : 
       8             : #include <utility>
       9             : 
      10             : #include "ametsuchi/block_query_factory.hpp"
      11             : #include "ametsuchi/mutable_storage.hpp"
      12             : #include "common/bind.hpp"
      13             : #include "interfaces/iroha_internal/block.hpp"
      14             : 
      15             : namespace iroha {
      16             :   namespace synchronizer {
      17             : 
      18             :     SynchronizerImpl::SynchronizerImpl(
      19             :         std::shared_ptr<network::ConsensusGate> consensus_gate,
      20             :         std::shared_ptr<validation::ChainValidator> validator,
      21             :         std::shared_ptr<ametsuchi::MutableFactory> mutable_factory,
      22             :         std::shared_ptr<ametsuchi::BlockQueryFactory> block_query_factory,
      23             :         std::shared_ptr<network::BlockLoader> block_loader)
      24           0 :         : validator_(std::move(validator)),
      25           0 :           mutable_factory_(std::move(mutable_factory)),
      26           0 :           block_query_factory_(std::move(block_query_factory)),
      27           0 :           block_loader_(std::move(block_loader)),
      28           0 :           log_(logger::log("synchronizer")) {
      29           0 :       consensus_gate->on_commit().subscribe(
      30           0 :           subscription_,
      31             :           [&](network::Commit commit) { this->process_commit(commit); });
      32           0 :     }
      33             : 
      34             :     SynchronizationEvent SynchronizerImpl::downloadMissingBlocks(
      35             :         std::shared_ptr<shared_model::interface::Block> commit_message,
      36             :         std::unique_ptr<ametsuchi::MutableStorage> storage,
      37             :         const shared_model::interface::types::HeightType height) {
      38           0 :       auto expected_height = commit_message->height();
      39             : 
      40             :       // while blocks are not loaded and not committed
      41           0 :       while (true) {
      42             :         // TODO andrei 17.10.18 IR-1763 Add delay strategy for loading blocks
      43           0 :         for (const auto &peer_signature : commit_message->signatures()) {
      44           0 :           auto network_chain = block_loader_->retrieveBlocks(
      45           0 :               height,
      46           0 :               shared_model::crypto::PublicKey(peer_signature.publicKey()));
      47             : 
      48           0 :           std::vector<std::shared_ptr<shared_model::interface::Block>> blocks;
      49           0 :           network_chain.as_blocking().subscribe(
      50             :               [&blocks](auto block) { blocks.push_back(block); });
      51           0 :           if (blocks.empty()) {
      52           0 :             log_->info("Downloaded an empty chain");
      53           0 :             continue;
      54             :           } else {
      55           0 :             log_->info("Successfully downloaded {} blocks", blocks.size());
      56             :           }
      57             : 
      58             :           auto chain =
      59           0 :               rxcpp::observable<>::iterate(blocks, rxcpp::identity_immediate());
      60           0 :           if (blocks.back()->height() >= expected_height
      61           0 :               and validator_->validateAndApply(chain, *storage)) {
      62           0 :             mutable_factory_->commit(std::move(storage));
      63             : 
      64           0 :             return {chain, SynchronizationOutcomeType::kCommit};
      65             :           }
      66           0 :         }
      67             :       }
      68           0 :     }
      69             : 
      70             :     void SynchronizerImpl::process_commit(network::Commit commit_message) {
      71           0 :       log_->info("processing commit");
      72             : 
      73           0 :       shared_model::interface::types::HeightType top_block_height{0};
      74           0 :       if (auto block_query = block_query_factory_->createBlockQuery()) {
      75           0 :         top_block_height = (*block_query)->getTopBlockHeight();
      76           0 :       } else {
      77           0 :         log_->error(
      78             :             "Unable to create block query and retrieve top block height");
      79           0 :         return;
      80             :       }
      81             : 
      82             :       const auto &block = commit_message.block;
      83             : 
      84           0 :       if (top_block_height >= block->height()) {
      85           0 :         log_->info(
      86             :             "Storage is already in synchronized state. Top block height is {}",
      87             :             top_block_height);
      88           0 :         return;
      89             :       }
      90             : 
      91           0 :       auto commit = rxcpp::observable<>::just(block);
      92             : 
      93             :       // if already voted for commit, try to apply prepared block
      94           0 :       if (commit_message.type == network::PeerVotedFor::kThisBlock) {
      95           0 :         bool block_applied = mutable_factory_->commitPrepared(*block);
      96           0 :         if (block_applied) {
      97           0 :           notifier_.get_subscriber().on_next(SynchronizationEvent{
      98           0 :               commit, SynchronizationOutcomeType::kCommit});
      99           0 :           return;
     100             :         }
     101           0 :       }
     102             : 
     103           0 :       auto mutable_storage_var = mutable_factory_->createMutableStorage();
     104           0 :       if (auto e =
     105           0 :               boost::get<expected::Error<std::string>>(&mutable_storage_var)) {
     106           0 :         log_->error("could not create mutable storage: {}", e->error);
     107           0 :         return;
     108             :       }
     109             :       auto storage =
     110           0 :           std::move(
     111           0 :               boost::get<
     112             :                   expected::Value<std::unique_ptr<ametsuchi::MutableStorage>>>(
     113           0 :                   mutable_storage_var))
     114           0 :               .value;
     115             : 
     116           0 :       SynchronizationEvent result;
     117             : 
     118           0 :       if (validator_->validateAndApply(commit, *storage)) {
     119           0 :         mutable_factory_->commit(std::move(storage));
     120             : 
     121           0 :         result = {commit, SynchronizationOutcomeType::kCommit};
     122           0 :       } else {
     123           0 :         result = downloadMissingBlocks(
     124           0 :             std::move(block), std::move(storage), top_block_height);
     125             :       }
     126             : 
     127           0 :       notifier_.get_subscriber().on_next(result);
     128           0 :     }
     129             : 
     130             :     rxcpp::observable<SynchronizationEvent>
     131             :     SynchronizerImpl::on_commit_chain() {
     132           0 :       return notifier_.get_observable();
     133           0 :     }
     134             : 
     135             :     SynchronizerImpl::~SynchronizerImpl() {
     136           0 :       subscription_.unsubscribe();
     137           0 :     }
     138             : 
     139             :   }  // namespace synchronizer
     140             : }  // namespace iroha

Generated by: LCOV version 1.13