Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "synchronizer/impl/synchronizer_impl.hpp"
7 :
8 : #include <utility>
9 :
10 : #include "ametsuchi/block_query_factory.hpp"
11 : #include "ametsuchi/mutable_storage.hpp"
12 : #include "common/bind.hpp"
13 : #include "interfaces/iroha_internal/block.hpp"
14 :
15 : namespace iroha {
16 : namespace synchronizer {
17 :
18 : SynchronizerImpl::SynchronizerImpl(
19 : std::shared_ptr<network::ConsensusGate> consensus_gate,
20 : std::shared_ptr<validation::ChainValidator> validator,
21 : std::shared_ptr<ametsuchi::MutableFactory> mutable_factory,
22 : std::shared_ptr<ametsuchi::BlockQueryFactory> block_query_factory,
23 : std::shared_ptr<network::BlockLoader> block_loader)
24 0 : : validator_(std::move(validator)),
25 0 : mutable_factory_(std::move(mutable_factory)),
26 0 : block_query_factory_(std::move(block_query_factory)),
27 0 : block_loader_(std::move(block_loader)),
28 0 : log_(logger::log("synchronizer")) {
29 0 : consensus_gate->on_commit().subscribe(
30 0 : subscription_,
31 : [&](network::Commit commit) { this->process_commit(commit); });
32 0 : }
33 :
34 : SynchronizationEvent SynchronizerImpl::downloadMissingBlocks(
35 : std::shared_ptr<shared_model::interface::Block> commit_message,
36 : std::unique_ptr<ametsuchi::MutableStorage> storage,
37 : const shared_model::interface::types::HeightType height) {
38 0 : auto expected_height = commit_message->height();
39 :
40 : // while blocks are not loaded and not committed
41 0 : while (true) {
42 : // TODO andrei 17.10.18 IR-1763 Add delay strategy for loading blocks
43 0 : for (const auto &peer_signature : commit_message->signatures()) {
44 0 : auto network_chain = block_loader_->retrieveBlocks(
45 0 : height,
46 0 : shared_model::crypto::PublicKey(peer_signature.publicKey()));
47 :
48 0 : std::vector<std::shared_ptr<shared_model::interface::Block>> blocks;
49 0 : network_chain.as_blocking().subscribe(
50 : [&blocks](auto block) { blocks.push_back(block); });
51 0 : if (blocks.empty()) {
52 0 : log_->info("Downloaded an empty chain");
53 0 : continue;
54 : } else {
55 0 : log_->info("Successfully downloaded {} blocks", blocks.size());
56 : }
57 :
58 : auto chain =
59 0 : rxcpp::observable<>::iterate(blocks, rxcpp::identity_immediate());
60 0 : if (blocks.back()->height() >= expected_height
61 0 : and validator_->validateAndApply(chain, *storage)) {
62 0 : mutable_factory_->commit(std::move(storage));
63 :
64 0 : return {chain, SynchronizationOutcomeType::kCommit};
65 : }
66 0 : }
67 : }
68 0 : }
69 :
70 : void SynchronizerImpl::process_commit(network::Commit commit_message) {
71 0 : log_->info("processing commit");
72 :
73 0 : shared_model::interface::types::HeightType top_block_height{0};
74 0 : if (auto block_query = block_query_factory_->createBlockQuery()) {
75 0 : top_block_height = (*block_query)->getTopBlockHeight();
76 0 : } else {
77 0 : log_->error(
78 : "Unable to create block query and retrieve top block height");
79 0 : return;
80 : }
81 :
82 : const auto &block = commit_message.block;
83 :
84 0 : if (top_block_height >= block->height()) {
85 0 : log_->info(
86 : "Storage is already in synchronized state. Top block height is {}",
87 : top_block_height);
88 0 : return;
89 : }
90 :
91 0 : auto commit = rxcpp::observable<>::just(block);
92 :
93 : // if already voted for commit, try to apply prepared block
94 0 : if (commit_message.type == network::PeerVotedFor::kThisBlock) {
95 0 : bool block_applied = mutable_factory_->commitPrepared(*block);
96 0 : if (block_applied) {
97 0 : notifier_.get_subscriber().on_next(SynchronizationEvent{
98 0 : commit, SynchronizationOutcomeType::kCommit});
99 0 : return;
100 : }
101 0 : }
102 :
103 0 : auto mutable_storage_var = mutable_factory_->createMutableStorage();
104 0 : if (auto e =
105 0 : boost::get<expected::Error<std::string>>(&mutable_storage_var)) {
106 0 : log_->error("could not create mutable storage: {}", e->error);
107 0 : return;
108 : }
109 : auto storage =
110 0 : std::move(
111 0 : boost::get<
112 : expected::Value<std::unique_ptr<ametsuchi::MutableStorage>>>(
113 0 : mutable_storage_var))
114 0 : .value;
115 :
116 0 : SynchronizationEvent result;
117 :
118 0 : if (validator_->validateAndApply(commit, *storage)) {
119 0 : mutable_factory_->commit(std::move(storage));
120 :
121 0 : result = {commit, SynchronizationOutcomeType::kCommit};
122 0 : } else {
123 0 : result = downloadMissingBlocks(
124 0 : std::move(block), std::move(storage), top_block_height);
125 : }
126 :
127 0 : notifier_.get_subscriber().on_next(result);
128 0 : }
129 :
130 : rxcpp::observable<SynchronizationEvent>
131 : SynchronizerImpl::on_commit_chain() {
132 0 : return notifier_.get_observable();
133 0 : }
134 :
135 : SynchronizerImpl::~SynchronizerImpl() {
136 0 : subscription_.unsubscribe();
137 0 : }
138 :
139 : } // namespace synchronizer
140 : } // namespace iroha
|