LCOV - code coverage report
Current view: top level - irohad/consensus/yac/impl - yac.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 94 95 98.9 %
Date: 2018-12-05 17:11:35 Functions: 31 31 100.0 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. 2018 All Rights Reserved.
       3             :  * http://soramitsu.co.jp
       4             :  *
       5             :  * Licensed under the Apache License, Version 2.0 (the "License");
       6             :  * you may not use this file except in compliance with the License.
       7             :  * You may obtain a copy of the License at
       8             :  *
       9             :  *        http://www.apache.org/licenses/LICENSE-2.0
      10             :  *
      11             :  * Unless required by applicable law or agreed to in writing, software
      12             :  * distributed under the License is distributed on an "AS IS" BASIS,
      13             :  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      14             :  * See the License for the specific language governing permissions and
      15             :  * limitations under the License.
      16             :  */
      17             : 
      18             : #include "consensus/yac/yac.hpp"
      19             : 
      20             : #include <utility>
      21             : 
      22             : #include "common/bind.hpp"
      23             : #include "common/visitor.hpp"
      24             : #include "consensus/yac/cluster_order.hpp"
      25             : #include "consensus/yac/storage/yac_proposal_storage.hpp"
      26             : #include "consensus/yac/timer.hpp"
      27             : #include "consensus/yac/yac_crypto_provider.hpp"
      28             : #include "cryptography/public_key.hpp"
      29             : #include "cryptography/signed.hpp"
      30             : #include "interfaces/common_objects/peer.hpp"
      31             : 
      32             : namespace iroha {
      33             :   namespace consensus {
      34             :     namespace yac {
      35             : 
      36             :       template <typename T>
      37             :       static std::string cryptoError(const T &votes) {
      38           4 :         std::string result =
      39           4 :             "Crypto verification failed for message.\n Votes: ";
      40             :         result += logger::to_string(votes, [](const auto &vote) {
      41           4 :           std::string result = "(Public key: ";
      42           4 :           result += vote.signature->publicKey().hex();
      43           4 :           result += ", Signature: ";
      44           4 :           result += vote.signature->signedData().hex();
      45           4 :           result += ")\n";
      46           4 :           return result;
      47           4 :         });
      48           4 :         return result;
      49           4 :       }
      50             : 
      51             :       std::shared_ptr<Yac> Yac::create(
      52             :           YacVoteStorage vote_storage,
      53             :           std::shared_ptr<YacNetwork> network,
      54             :           std::shared_ptr<YacCryptoProvider> crypto,
      55             :           std::shared_ptr<Timer> timer,
      56             :           ClusterOrdering order) {
      57         271 :         return std::make_shared<Yac>(
      58             :             vote_storage, network, crypto, timer, order);
      59             :       }
      60             : 
      61             :       Yac::Yac(YacVoteStorage vote_storage,
      62             :                std::shared_ptr<YacNetwork> network,
      63             :                std::shared_ptr<YacCryptoProvider> crypto,
      64             :                std::shared_ptr<Timer> timer,
      65             :                ClusterOrdering order)
      66         271 :           : vote_storage_(std::move(vote_storage)),
      67         271 :             network_(std::move(network)),
      68         271 :             crypto_(std::move(crypto)),
      69         271 :             timer_(std::move(timer)),
      70         271 :             cluster_order_(order) {
      71         271 :         log_ = logger::log("YAC");
      72         271 :       }
      73             : 
      74             :       // ------|Hash gate|------
      75             : 
      76             :       void Yac::vote(YacHash hash, ClusterOrdering order) {
      77         716 :         log_->info("Order for voting: {}",
      78         716 :                    logger::to_string(order.getPeers(),
      79             :                                      [](auto val) { return val->address(); }));
      80             : 
      81         716 :         cluster_order_ = order;
      82             :         auto vote = crypto_->getVote(hash);
      83             :         // TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a
      84             :         // separate entity
      85         716 :         votingStep(vote);
      86         716 :       }
      87             : 
      88             :       rxcpp::observable<Answer> Yac::onOutcome() {
      89         255 :         return notifier_.get_observable();
      90           0 :       }
      91             : 
      92             :       // ------|Network notifications|------
      93             : 
      94             :       void Yac::onState(std::vector<VoteMessage> state) {
      95        1469 :         std::lock_guard<std::mutex> guard(mutex_);
      96        1469 :         if (crypto_->verify(state)) {
      97        1463 :           applyState(state);
      98        1465 :         } else {
      99           4 :           log_->warn(cryptoError(state));
     100             :         }
     101        1469 :       }
     102             : 
     103             :       // ------|Private interface|------
     104             : 
     105             :       void Yac::votingStep(VoteMessage vote) {
     106         736 :         auto committed = vote_storage_.isCommitted(vote.hash.vote_round);
     107         736 :         if (committed) {
     108           1 :           return;
     109             :         }
     110             : 
     111         735 :         log_->info("Vote for round ({}, {}), hash ({}, {})",
     112             :                    vote.hash.vote_round.block_round,
     113         735 :                    vote.hash.vote_round.reject_round,
     114         735 :                    vote.hash.vote_hashes.proposal_hash,
     115         735 :                    vote.hash.vote_hashes.block_hash);
     116             : 
     117         735 :         network_->sendState(cluster_order_.currentLeader(), {vote});
     118         735 :         cluster_order_.switchToNext();
     119         735 :         if (cluster_order_.hasNext()) {
     120             :           timer_->invokeAfterDelay([this, vote] { this->votingStep(vote); });
     121          20 :         }
     122         736 :       }
     123             : 
     124             :       void Yac::closeRound() {
     125         718 :         timer_->deny();
     126         718 :       }
     127             : 
     128             :       boost::optional<std::shared_ptr<shared_model::interface::Peer>>
     129             :       Yac::findPeer(const VoteMessage &vote) {
     130           3 :         auto peers = cluster_order_.getPeers();
     131             :         auto it =
     132             :             std::find_if(peers.begin(), peers.end(), [&](const auto &peer) {
     133          17 :               return peer->pubkey() == vote.signature->publicKey();
     134             :             });
     135           3 :         return it != peers.end() ? boost::make_optional(std::move(*it))
     136           2 :                                  : boost::none;
     137           3 :       }
     138             : 
     139             :       // ------|Apply data|------
     140             : 
     141             :       void Yac::applyState(const std::vector<VoteMessage> &state) {
     142             :         auto answer =
     143        1465 :             vote_storage_.store(state, cluster_order_.getNumberOfPeers());
     144             : 
     145             :         // TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a
     146             :         // separate entity
     147             : 
     148             :         answer | [&](const auto &answer) {
     149        1465 :           auto &proposal_round = state.at(0).hash.vote_round;
     150             : 
     151             :           /*
     152             :            * It is possible that a new peer with an outdated peers list may
     153             :            * collect an outcome from a smaller number of peers which are
     154             :            * included in set of `f` peers in the system. The new peer will not
     155             :            * accept our message with valid supermajority because he cannot apply
     156             :            * votes from unknown peers.
     157             :            */
     158        1437 :           if (state.size() > 1) {
     159             :             // some peer has already collected commit/reject, so it is sent
     160          11 :             if (vote_storage_.getProcessingState(proposal_round)
     161          11 :                 == ProposalState::kNotSentNotProcessed) {
     162           6 :               vote_storage_.nextProcessingState(proposal_round);
     163           6 :               log_->info(
     164             :                   "Received supermajority of votes for ({}, {}), skip "
     165             :                   "propagation",
     166           6 :                   proposal_round.block_round,
     167           6 :                   proposal_round.reject_round);
     168           6 :             }
     169          11 :           }
     170             : 
     171        1439 :           auto processing_state =
     172        1439 :               vote_storage_.getProcessingState(proposal_round);
     173             : 
     174             :           auto votes = [](const auto &state) { return state.votes; };
     175             : 
     176        1439 :           switch (processing_state) {
     177             :             case ProposalState::kNotSentNotProcessed:
     178         714 :               vote_storage_.nextProcessingState(proposal_round);
     179         714 :               log_->info("Propagate state ({}, {}) to whole network",
     180         714 :                          proposal_round.block_round,
     181         714 :                          proposal_round.reject_round);
     182         714 :               this->propagateState(visit_in_place(answer, votes));
     183         714 :               break;
     184             :             case ProposalState::kSentNotProcessed:
     185         718 :               vote_storage_.nextProcessingState(proposal_round);
     186         718 :               log_->info("Pass outcome for ({}, {}) to pipeline",
     187         718 :                          proposal_round.block_round,
     188         718 :                          proposal_round.reject_round);
     189         718 :               this->closeRound();
     190         718 :               notifier_.get_subscriber().on_next(answer);
     191         718 :               break;
     192             :             case ProposalState::kSentProcessed:
     193           5 :               if (state.size() == 1) {
     194             :                 this->findPeer(state.at(0)) | [&](const auto &from) {
     195           1 :                   log_->info("Propagate state ({}, {}) directly to {}",
     196           3 :                              proposal_round.block_round,
     197           1 :                              proposal_round.reject_round,
     198           1 :                              from->address());
     199           1 :                   this->propagateStateDirectly(*from,
     200           3 :                                                visit_in_place(answer, votes));
     201           1 :                 };
     202           3 :               }
     203           5 :               break;
     204             :           }
     205        1437 :         };
     206        1465 :       }
     207             : 
     208             :       // ------|Propagation|------
     209             : 
     210             :       void Yac::propagateState(const std::vector<VoteMessage> &msg) {
     211        1447 :         for (const auto &peer : cluster_order_.getPeers()) {
     212         733 :           propagateStateDirectly(*peer, msg);
     213             :         }
     214         714 :       }
     215             : 
     216             :       void Yac::propagateStateDirectly(const shared_model::interface::Peer &to,
     217             :                                        const std::vector<VoteMessage> &msg) {
     218         734 :         network_->sendState(to, msg);
     219         734 :       }
     220             : 
     221             :     }  // namespace yac
     222             :   }    // namespace consensus
     223             : }  // namespace iroha

Generated by: LCOV version 1.13