Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "consensus/yac/impl/yac_gate_impl.hpp"
7 :
8 : #include "common/visitor.hpp"
9 : #include "consensus/yac/cluster_order.hpp"
10 : #include "consensus/yac/messages.hpp"
11 : #include "consensus/yac/storage/yac_common.hpp"
12 : #include "consensus/yac/yac_hash_provider.hpp"
13 : #include "consensus/yac/yac_peer_orderer.hpp"
14 : #include "cryptography/public_key.hpp"
15 : #include "interfaces/common_objects/signature.hpp"
16 : #include "network/block_loader.hpp"
17 : #include "simulator/block_creator.hpp"
18 :
19 : namespace iroha {
20 : namespace consensus {
21 : namespace yac {
22 :
23 : YacGateImpl::YacGateImpl(
24 : std::shared_ptr<HashGate> hash_gate,
25 : std::shared_ptr<YacPeerOrderer> orderer,
26 : std::shared_ptr<YacHashProvider> hash_provider,
27 : std::shared_ptr<simulator::BlockCreator> block_creator,
28 : std::shared_ptr<network::BlockLoader> block_loader,
29 : std::shared_ptr<consensus::ConsensusResultCache>
30 : consensus_result_cache)
31 251 : : hash_gate_(std::move(hash_gate)),
32 251 : orderer_(std::move(orderer)),
33 251 : hash_provider_(std::move(hash_provider)),
34 251 : block_creator_(std::move(block_creator)),
35 251 : block_loader_(std::move(block_loader)),
36 251 : consensus_result_cache_(std::move(consensus_result_cache)),
37 251 : log_(logger::log("YacGate")) {
38 251 : block_creator_->on_block().subscribe(
39 : [this](auto block) { this->vote(block); });
40 251 : }
41 :
42 : void YacGateImpl::vote(
43 : std::shared_ptr<shared_model::interface::Block> block) {
44 714 : auto hash = hash_provider_->makeHash(*block);
45 714 : log_->info("vote for block ({}, {})",
46 714 : hash.vote_hashes.proposal_hash,
47 714 : block->hash().toString());
48 714 : auto order = orderer_->getOrdering(hash);
49 714 : if (not order) {
50 1 : log_->error("ordering doesn't provide peers => pass round");
51 1 : return;
52 : }
53 713 : current_block_ = std::make_pair(hash, block);
54 713 : hash_gate_->vote(hash, *order);
55 :
56 : // insert the block we voted for to the consensus cache
57 713 : consensus_result_cache_->insert(block);
58 714 : }
59 :
60 : rxcpp::observable<network::Commit> YacGateImpl::on_commit() {
61 : return hash_gate_->onOutcome().flat_map([this](auto message) {
62 : // TODO 10.06.2018 andrei: IR-497 Work on reject case
63 713 : auto commit_message = boost::get<CommitMessage>(message);
64 : // map commit to block if it is present or loaded from other peer
65 713 : return rxcpp::observable<>::create<network::Commit>(
66 : [this, commit_message](auto subscriber) {
67 713 : const auto hash = getHash(commit_message.votes);
68 713 : if (not hash) {
69 0 : log_->info("Invalid commit message, hashes are different");
70 0 : subscriber.on_completed();
71 0 : return;
72 : }
73 : // if node has voted for the committed block
74 713 : if (hash == current_block_.first) {
75 : // append signatures of other nodes
76 710 : this->copySignatures(commit_message);
77 710 : log_->info("consensus: commit top block: height {}, hash {}",
78 710 : current_block_.second->height(),
79 710 : current_block_.second->hash().hex());
80 710 : subscriber.on_next(
81 710 : network::Commit{current_block_.second,
82 : network::PeerVotedFor::kThisBlock});
83 710 : subscriber.on_completed();
84 710 : return;
85 : }
86 : // node has voted for another block - load committed block
87 : const auto model_hash =
88 3 : hash_provider_->toModelHash(hash.value());
89 : // iterate over peers who voted for the committed block
90 : // TODO [IR-1753] Akvinikym 11.10.18: add exponential backoff
91 : // for each peer iteration and shuffle peers order
92 : rxcpp::observable<>::iterate(commit_message.votes)
93 : // allow other peers to apply commit
94 : .flat_map([this, model_hash](auto vote) {
95 : // map vote to block if it can be loaded
96 4 : return rxcpp::observable<>::create<network::Commit>(
97 : [this, model_hash, vote](auto subscriber) {
98 4 : auto block = block_loader_->retrieveBlock(
99 4 : vote.signature->publicKey(),
100 4 : shared_model::crypto::Hash(model_hash));
101 : // if load is successful
102 : if (block) {
103 : // update the cache with block consensus voted for
104 3 : consensus_result_cache_->insert(*block);
105 3 : subscriber.on_next(network::Commit{
106 3 : *block, network::PeerVotedFor::kOtherBlock});
107 3 : } else {
108 1 : log_->error(
109 : "Could not get block from block loader");
110 : }
111 4 : subscriber.on_completed();
112 : });
113 0 : })
114 : // need only the first
115 3 : .first()
116 3 : .retry()
117 3 : .subscribe(
118 : // if load is successful from at least one node
119 : [subscriber](auto block) {
120 3 : subscriber.on_next(block);
121 3 : subscriber.on_completed();
122 3 : },
123 : // if load has failed, no peers provided the block
124 : [this, subscriber](std::exception_ptr) {
125 0 : log_->error("Cannot load committed block");
126 0 : subscriber.on_completed();
127 0 : });
128 713 : });
129 713 : });
130 0 : }
131 :
132 : void YacGateImpl::copySignatures(const CommitMessage &commit) {
133 1422 : for (const auto &vote : commit.votes) {
134 712 : auto sig = vote.hash.block_signature;
135 712 : current_block_.second->addSignature(sig->signedData(),
136 712 : sig->publicKey());
137 712 : }
138 710 : }
139 : } // namespace yac
140 : } // namespace consensus
141 : } // namespace iroha
|