Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "simulator/impl/simulator.hpp"
7 :
8 : #include <boost/range/adaptor/map.hpp>
9 : #include <boost/range/adaptor/transformed.hpp>
10 :
11 : #include "common/bind.hpp"
12 : #include "interfaces/iroha_internal/block.hpp"
13 : #include "interfaces/iroha_internal/proposal.hpp"
14 :
15 : namespace iroha {
16 : namespace simulator {
17 :
18 : Simulator::Simulator(
19 : std::shared_ptr<network::OrderingGate> ordering_gate,
20 : std::shared_ptr<validation::StatefulValidator> statefulValidator,
21 : std::shared_ptr<ametsuchi::TemporaryFactory> factory,
22 : std::shared_ptr<ametsuchi::BlockQueryFactory> block_query_factory,
23 : std::shared_ptr<shared_model::crypto::CryptoModelSigner<>>
24 : crypto_signer,
25 : std::unique_ptr<shared_model::interface::UnsafeBlockFactory>
26 : block_factory)
27 250 : : validator_(std::move(statefulValidator)),
28 250 : ametsuchi_factory_(std::move(factory)),
29 250 : block_query_factory_(block_query_factory),
30 250 : crypto_signer_(std::move(crypto_signer)),
31 250 : block_factory_(std::move(block_factory)),
32 250 : log_(logger::log("Simulator")) {
33 250 : ordering_gate->on_proposal().subscribe(
34 250 : proposal_subscription_,
35 : [this](std::shared_ptr<shared_model::interface::Proposal> proposal) {
36 709 : this->process_proposal(*proposal);
37 709 : });
38 :
39 250 : notifier_.get_observable().subscribe(
40 250 : verified_proposal_subscription_,
41 : [this](std::shared_ptr<iroha::validation::VerifiedProposalAndErrors>
42 : verified_proposal_and_errors) {
43 710 : this->process_verified_proposal(verified_proposal_and_errors);
44 710 : });
45 250 : }
46 :
47 : Simulator::~Simulator() {
48 250 : proposal_subscription_.unsubscribe();
49 250 : verified_proposal_subscription_.unsubscribe();
50 250 : }
51 :
52 : rxcpp::observable<
53 : std::shared_ptr<iroha::validation::VerifiedProposalAndErrors>>
54 : Simulator::on_verified_proposal() {
55 494 : return notifier_.get_observable();
56 0 : }
57 :
58 : void Simulator::process_proposal(
59 : const shared_model::interface::Proposal &proposal) {
60 713 : log_->info("process proposal");
61 :
62 : // Get last block from local ledger
63 713 : auto block_query_opt = block_query_factory_->createBlockQuery();
64 713 : if (not block_query_opt) {
65 1 : log_->error("could not create block query");
66 1 : return;
67 : }
68 :
69 712 : auto block_var = block_query_opt.value()->getTopBlock();
70 712 : if (auto e = boost::get<expected::Error<std::string>>(&block_var)) {
71 1 : log_->warn("Could not fetch last block: " + e->error);
72 1 : return;
73 : }
74 :
75 711 : last_block =
76 711 : boost::get<
77 : expected::Value<std::shared_ptr<shared_model::interface::Block>>>(
78 711 : &block_var)
79 711 : ->value;
80 :
81 711 : if (last_block->height() + 1 != proposal.height()) {
82 : log_->warn("Last block height: {}, proposal height: {}",
83 1 : last_block->height(),
84 1 : proposal.height());
85 1 : return;
86 : }
87 :
88 710 : auto temporary_wsv_var = ametsuchi_factory_->createTemporaryWsv();
89 710 : if (auto e =
90 710 : boost::get<expected::Error<std::string>>(&temporary_wsv_var)) {
91 0 : log_->error("could not create temporary storage: {}", e->error);
92 : return;
93 : }
94 :
95 710 : auto storage = std::move(
96 710 : boost::get<expected::Value<std::unique_ptr<ametsuchi::TemporaryWsv>>>(
97 710 : &temporary_wsv_var)
98 710 : ->value);
99 :
100 : std::shared_ptr<iroha::validation::VerifiedProposalAndErrors>
101 710 : validated_proposal_and_errors =
102 : validator_->validate(proposal, *storage);
103 710 : ametsuchi_factory_->prepareBlock(std::move(storage));
104 :
105 710 : notifier_.get_subscriber().on_next(
106 710 : std::move(validated_proposal_and_errors));
107 713 : }
108 :
109 : void Simulator::process_verified_proposal(
110 : const std::shared_ptr<iroha::validation::VerifiedProposalAndErrors>
111 : &verified_proposal_and_errors) {
112 : log_->info("process verified proposal");
113 :
114 710 : auto height = block_query_factory_->createBlockQuery() |
115 : [&](const auto &block_query) {
116 710 : return block_query->getTopBlockHeight() + 1;
117 : };
118 710 : if (not height) {
119 0 : log_->error("Unable to query top block height");
120 0 : return;
121 : }
122 710 : const auto &proposal = verified_proposal_and_errors->verified_proposal;
123 710 : const auto &rejected_tx_hashes =
124 710 : verified_proposal_and_errors->rejected_transactions
125 710 : | boost::adaptors::map_keys;
126 710 : std::shared_ptr<shared_model::interface::Block> block =
127 710 : block_factory_->unsafeCreateBlock(height,
128 710 : last_block->hash(),
129 710 : proposal->createdTime(),
130 710 : proposal->transactions(),
131 710 : rejected_tx_hashes);
132 710 : crypto_signer_->sign(*block);
133 710 : block_notifier_.get_subscriber().on_next(block);
134 710 : }
135 :
136 : rxcpp::observable<std::shared_ptr<shared_model::interface::Block>>
137 : Simulator::on_block() {
138 248 : return block_notifier_.get_observable();
139 0 : }
140 :
141 : } // namespace simulator
142 : } // namespace iroha
|