Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. 2018 All Rights Reserved.
3 : * http://soramitsu.co.jp
4 : *
5 : * Licensed under the Apache License, Version 2.0 (the "License");
6 : * you may not use this file except in compliance with the License.
7 : * You may obtain a copy of the License at
8 : *
9 : * http://www.apache.org/licenses/LICENSE-2.0
10 : *
11 : * Unless required by applicable law or agreed to in writing, software
12 : * distributed under the License is distributed on an "AS IS" BASIS,
13 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : * See the License for the specific language governing permissions and
15 : * limitations under the License.
16 : */
17 :
18 : #include "consensus/yac/yac.hpp"
19 :
20 : #include <utility>
21 :
22 : #include "common/bind.hpp"
23 : #include "common/visitor.hpp"
24 : #include "consensus/yac/cluster_order.hpp"
25 : #include "consensus/yac/storage/yac_proposal_storage.hpp"
26 : #include "consensus/yac/timer.hpp"
27 : #include "consensus/yac/yac_crypto_provider.hpp"
28 : #include "cryptography/public_key.hpp"
29 : #include "cryptography/signed.hpp"
30 : #include "interfaces/common_objects/peer.hpp"
31 :
32 : namespace iroha {
33 : namespace consensus {
34 : namespace yac {
35 :
36 : template <typename T>
37 : static std::string cryptoError(const T &votes) {
38 4 : std::string result =
39 4 : "Crypto verification failed for message.\n Votes: ";
40 : result += logger::to_string(votes, [](const auto &vote) {
41 4 : std::string result = "(Public key: ";
42 4 : result += vote.signature->publicKey().hex();
43 4 : result += ", Signature: ";
44 4 : result += vote.signature->signedData().hex();
45 4 : result += ")\n";
46 4 : return result;
47 4 : });
48 4 : return result;
49 4 : }
50 :
51 : std::shared_ptr<Yac> Yac::create(
52 : YacVoteStorage vote_storage,
53 : std::shared_ptr<YacNetwork> network,
54 : std::shared_ptr<YacCryptoProvider> crypto,
55 : std::shared_ptr<Timer> timer,
56 : ClusterOrdering order) {
57 271 : return std::make_shared<Yac>(
58 : vote_storage, network, crypto, timer, order);
59 : }
60 :
61 : Yac::Yac(YacVoteStorage vote_storage,
62 : std::shared_ptr<YacNetwork> network,
63 : std::shared_ptr<YacCryptoProvider> crypto,
64 : std::shared_ptr<Timer> timer,
65 : ClusterOrdering order)
66 271 : : vote_storage_(std::move(vote_storage)),
67 271 : network_(std::move(network)),
68 271 : crypto_(std::move(crypto)),
69 271 : timer_(std::move(timer)),
70 271 : cluster_order_(order) {
71 271 : log_ = logger::log("YAC");
72 271 : }
73 :
74 : // ------|Hash gate|------
75 :
76 : void Yac::vote(YacHash hash, ClusterOrdering order) {
77 716 : log_->info("Order for voting: {}",
78 716 : logger::to_string(order.getPeers(),
79 : [](auto val) { return val->address(); }));
80 :
81 716 : cluster_order_ = order;
82 : auto vote = crypto_->getVote(hash);
83 : // TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a
84 : // separate entity
85 716 : votingStep(vote);
86 716 : }
87 :
88 : rxcpp::observable<Answer> Yac::onOutcome() {
89 255 : return notifier_.get_observable();
90 0 : }
91 :
92 : // ------|Network notifications|------
93 :
94 : void Yac::onState(std::vector<VoteMessage> state) {
95 1469 : std::lock_guard<std::mutex> guard(mutex_);
96 1469 : if (crypto_->verify(state)) {
97 1463 : applyState(state);
98 1465 : } else {
99 4 : log_->warn(cryptoError(state));
100 : }
101 1469 : }
102 :
103 : // ------|Private interface|------
104 :
105 : void Yac::votingStep(VoteMessage vote) {
106 736 : auto committed = vote_storage_.isCommitted(vote.hash.vote_round);
107 736 : if (committed) {
108 1 : return;
109 : }
110 :
111 735 : log_->info("Vote for round ({}, {}), hash ({}, {})",
112 : vote.hash.vote_round.block_round,
113 735 : vote.hash.vote_round.reject_round,
114 735 : vote.hash.vote_hashes.proposal_hash,
115 735 : vote.hash.vote_hashes.block_hash);
116 :
117 735 : network_->sendState(cluster_order_.currentLeader(), {vote});
118 735 : cluster_order_.switchToNext();
119 735 : if (cluster_order_.hasNext()) {
120 : timer_->invokeAfterDelay([this, vote] { this->votingStep(vote); });
121 20 : }
122 736 : }
123 :
124 : void Yac::closeRound() {
125 718 : timer_->deny();
126 718 : }
127 :
128 : boost::optional<std::shared_ptr<shared_model::interface::Peer>>
129 : Yac::findPeer(const VoteMessage &vote) {
130 3 : auto peers = cluster_order_.getPeers();
131 : auto it =
132 : std::find_if(peers.begin(), peers.end(), [&](const auto &peer) {
133 17 : return peer->pubkey() == vote.signature->publicKey();
134 : });
135 3 : return it != peers.end() ? boost::make_optional(std::move(*it))
136 2 : : boost::none;
137 3 : }
138 :
139 : // ------|Apply data|------
140 :
141 : void Yac::applyState(const std::vector<VoteMessage> &state) {
142 : auto answer =
143 1465 : vote_storage_.store(state, cluster_order_.getNumberOfPeers());
144 :
145 : // TODO 10.06.2018 andrei: IR-1407 move YAC propagation strategy to a
146 : // separate entity
147 :
148 : answer | [&](const auto &answer) {
149 1465 : auto &proposal_round = state.at(0).hash.vote_round;
150 :
151 : /*
152 : * It is possible that a new peer with an outdated peers list may
153 : * collect an outcome from a smaller number of peers which are
154 : * included in set of `f` peers in the system. The new peer will not
155 : * accept our message with valid supermajority because he cannot apply
156 : * votes from unknown peers.
157 : */
158 1437 : if (state.size() > 1) {
159 : // some peer has already collected commit/reject, so it is sent
160 11 : if (vote_storage_.getProcessingState(proposal_round)
161 11 : == ProposalState::kNotSentNotProcessed) {
162 6 : vote_storage_.nextProcessingState(proposal_round);
163 6 : log_->info(
164 : "Received supermajority of votes for ({}, {}), skip "
165 : "propagation",
166 6 : proposal_round.block_round,
167 6 : proposal_round.reject_round);
168 6 : }
169 11 : }
170 :
171 1439 : auto processing_state =
172 1439 : vote_storage_.getProcessingState(proposal_round);
173 :
174 : auto votes = [](const auto &state) { return state.votes; };
175 :
176 1439 : switch (processing_state) {
177 : case ProposalState::kNotSentNotProcessed:
178 714 : vote_storage_.nextProcessingState(proposal_round);
179 714 : log_->info("Propagate state ({}, {}) to whole network",
180 714 : proposal_round.block_round,
181 714 : proposal_round.reject_round);
182 714 : this->propagateState(visit_in_place(answer, votes));
183 714 : break;
184 : case ProposalState::kSentNotProcessed:
185 718 : vote_storage_.nextProcessingState(proposal_round);
186 718 : log_->info("Pass outcome for ({}, {}) to pipeline",
187 718 : proposal_round.block_round,
188 718 : proposal_round.reject_round);
189 718 : this->closeRound();
190 718 : notifier_.get_subscriber().on_next(answer);
191 718 : break;
192 : case ProposalState::kSentProcessed:
193 5 : if (state.size() == 1) {
194 : this->findPeer(state.at(0)) | [&](const auto &from) {
195 1 : log_->info("Propagate state ({}, {}) directly to {}",
196 3 : proposal_round.block_round,
197 1 : proposal_round.reject_round,
198 1 : from->address());
199 1 : this->propagateStateDirectly(*from,
200 3 : visit_in_place(answer, votes));
201 1 : };
202 3 : }
203 5 : break;
204 : }
205 1437 : };
206 1465 : }
207 :
208 : // ------|Propagation|------
209 :
210 : void Yac::propagateState(const std::vector<VoteMessage> &msg) {
211 1447 : for (const auto &peer : cluster_order_.getPeers()) {
212 733 : propagateStateDirectly(*peer, msg);
213 : }
214 714 : }
215 :
216 : void Yac::propagateStateDirectly(const shared_model::interface::Peer &to,
217 : const std::vector<VoteMessage> &msg) {
218 734 : network_->sendState(to, msg);
219 734 : }
220 :
221 : } // namespace yac
222 : } // namespace consensus
223 : } // namespace iroha
|