LCOV - code coverage report
Current view: top level - irohad/consensus/yac/impl - yac_gate_impl.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 59 67 88.1 %
Date: 2018-12-05 17:11:35 Functions: 45 46 97.8 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "consensus/yac/impl/yac_gate_impl.hpp"
       7             : 
       8             : #include "common/visitor.hpp"
       9             : #include "consensus/yac/cluster_order.hpp"
      10             : #include "consensus/yac/messages.hpp"
      11             : #include "consensus/yac/storage/yac_common.hpp"
      12             : #include "consensus/yac/yac_hash_provider.hpp"
      13             : #include "consensus/yac/yac_peer_orderer.hpp"
      14             : #include "cryptography/public_key.hpp"
      15             : #include "interfaces/common_objects/signature.hpp"
      16             : #include "network/block_loader.hpp"
      17             : #include "simulator/block_creator.hpp"
      18             : 
      19             : namespace iroha {
      20             :   namespace consensus {
      21             :     namespace yac {
      22             : 
      23             :       YacGateImpl::YacGateImpl(
      24             :           std::shared_ptr<HashGate> hash_gate,
      25             :           std::shared_ptr<YacPeerOrderer> orderer,
      26             :           std::shared_ptr<YacHashProvider> hash_provider,
      27             :           std::shared_ptr<simulator::BlockCreator> block_creator,
      28             :           std::shared_ptr<network::BlockLoader> block_loader,
      29             :           std::shared_ptr<consensus::ConsensusResultCache>
      30             :               consensus_result_cache)
      31         251 :           : hash_gate_(std::move(hash_gate)),
      32         251 :             orderer_(std::move(orderer)),
      33         251 :             hash_provider_(std::move(hash_provider)),
      34         251 :             block_creator_(std::move(block_creator)),
      35         251 :             block_loader_(std::move(block_loader)),
      36         251 :             consensus_result_cache_(std::move(consensus_result_cache)),
      37         251 :             log_(logger::log("YacGate")) {
      38         251 :         block_creator_->on_block().subscribe(
      39             :             [this](auto block) { this->vote(block); });
      40         251 :       }
      41             : 
      42             :       void YacGateImpl::vote(
      43             :           std::shared_ptr<shared_model::interface::Block> block) {
      44         714 :         auto hash = hash_provider_->makeHash(*block);
      45         714 :         log_->info("vote for block ({}, {})",
      46         714 :                    hash.vote_hashes.proposal_hash,
      47         714 :                    block->hash().toString());
      48         714 :         auto order = orderer_->getOrdering(hash);
      49         714 :         if (not order) {
      50           1 :           log_->error("ordering doesn't provide peers => pass round");
      51           1 :           return;
      52             :         }
      53         713 :         current_block_ = std::make_pair(hash, block);
      54         713 :         hash_gate_->vote(hash, *order);
      55             : 
      56             :         // insert the block we voted for to the consensus cache
      57         713 :         consensus_result_cache_->insert(block);
      58         714 :       }
      59             : 
      60             :       rxcpp::observable<network::Commit> YacGateImpl::on_commit() {
      61             :         return hash_gate_->onOutcome().flat_map([this](auto message) {
      62             :           // TODO 10.06.2018 andrei: IR-497 Work on reject case
      63         713 :           auto commit_message = boost::get<CommitMessage>(message);
      64             :           // map commit to block if it is present or loaded from other peer
      65         713 :           return rxcpp::observable<>::create<network::Commit>(
      66             :               [this, commit_message](auto subscriber) {
      67         713 :                 const auto hash = getHash(commit_message.votes);
      68         713 :                 if (not hash) {
      69           0 :                   log_->info("Invalid commit message, hashes are different");
      70           0 :                   subscriber.on_completed();
      71           0 :                   return;
      72             :                 }
      73             :                 // if node has voted for the committed block
      74         713 :                 if (hash == current_block_.first) {
      75             :                   // append signatures of other nodes
      76         710 :                   this->copySignatures(commit_message);
      77         710 :                   log_->info("consensus: commit top block: height {}, hash {}",
      78         710 :                              current_block_.second->height(),
      79         710 :                              current_block_.second->hash().hex());
      80         710 :                   subscriber.on_next(
      81         710 :                       network::Commit{current_block_.second,
      82             :                                       network::PeerVotedFor::kThisBlock});
      83         710 :                   subscriber.on_completed();
      84         710 :                   return;
      85             :                 }
      86             :                 // node has voted for another block - load committed block
      87             :                 const auto model_hash =
      88           3 :                     hash_provider_->toModelHash(hash.value());
      89             :                 // iterate over peers who voted for the committed block
      90             :                 // TODO [IR-1753] Akvinikym 11.10.18: add exponential backoff
      91             :                 // for each peer iteration and shuffle peers order
      92             :                 rxcpp::observable<>::iterate(commit_message.votes)
      93             :                     // allow other peers to apply commit
      94             :                     .flat_map([this, model_hash](auto vote) {
      95             :                       // map vote to block if it can be loaded
      96           4 :                       return rxcpp::observable<>::create<network::Commit>(
      97             :                           [this, model_hash, vote](auto subscriber) {
      98           4 :                             auto block = block_loader_->retrieveBlock(
      99           4 :                                 vote.signature->publicKey(),
     100           4 :                                 shared_model::crypto::Hash(model_hash));
     101             :                             // if load is successful
     102             :                             if (block) {
     103             :                               // update the cache with block consensus voted for
     104           3 :                               consensus_result_cache_->insert(*block);
     105           3 :                               subscriber.on_next(network::Commit{
     106           3 :                                   *block, network::PeerVotedFor::kOtherBlock});
     107           3 :                             } else {
     108           1 :                               log_->error(
     109             :                                   "Could not get block from block loader");
     110             :                             }
     111           4 :                             subscriber.on_completed();
     112             :                           });
     113           0 :                     })
     114             :                     // need only the first
     115           3 :                     .first()
     116           3 :                     .retry()
     117           3 :                     .subscribe(
     118             :                         // if load is successful from at least one node
     119             :                         [subscriber](auto block) {
     120           3 :                           subscriber.on_next(block);
     121           3 :                           subscriber.on_completed();
     122           3 :                         },
     123             :                         // if load has failed, no peers provided the block
     124             :                         [this, subscriber](std::exception_ptr) {
     125           0 :                           log_->error("Cannot load committed block");
     126           0 :                           subscriber.on_completed();
     127           0 :                         });
     128         713 :               });
     129         713 :         });
     130           0 :       }
     131             : 
     132             :       void YacGateImpl::copySignatures(const CommitMessage &commit) {
     133        1422 :         for (const auto &vote : commit.votes) {
     134         712 :           auto sig = vote.hash.block_signature;
     135         712 :           current_block_.second->addSignature(sig->signedData(),
     136         712 :                                               sig->publicKey());
     137         712 :         }
     138         710 :       }
     139             :     }  // namespace yac
     140             :   }    // namespace consensus
     141             : }  // namespace iroha

Generated by: LCOV version 1.13