Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "torii/processor/transaction_processor_impl.hpp"
7 :
8 : #include <boost/format.hpp>
9 :
10 : #include "interfaces/iroha_internal/block.hpp"
11 : #include "interfaces/iroha_internal/proposal.hpp"
12 : #include "interfaces/iroha_internal/transaction_batch.hpp"
13 : #include "interfaces/iroha_internal/transaction_sequence.hpp"
14 : #include "validation/stateful_validator_common.hpp"
15 :
16 : namespace iroha {
17 : namespace torii {
18 :
19 : using network::PeerCommunicationService;
20 :
21 : namespace {
22 : std::string composeErrorMessage(
23 : const validation::TransactionError &tx_hash_and_error) {
24 60 : const auto tx_hash = tx_hash_and_error.first.hex();
25 60 : const auto &cmd_error = tx_hash_and_error.second;
26 60 : if (not cmd_error.tx_passed_initial_validation) {
27 6 : return (boost::format(
28 : "Stateful validation error: transaction %s "
29 : "did not pass initial verification: "
30 : "checking '%s', error code '%d', query arguments: %s")
31 6 : % tx_hash % cmd_error.name % cmd_error.error_code
32 6 : % cmd_error.error_extra)
33 6 : .str();
34 : }
35 54 : return (boost::format(
36 : "Stateful validation error in transaction %s: "
37 : "command '%s' with index '%d' did not pass "
38 : "verification with code '%d', query arguments: %s")
39 54 : % tx_hash % cmd_error.name % cmd_error.index
40 54 : % cmd_error.error_code % cmd_error.error_extra)
41 54 : .str();
42 60 : }
43 : } // namespace
44 :
45 : TransactionProcessorImpl::TransactionProcessorImpl(
46 : std::shared_ptr<PeerCommunicationService> pcs,
47 : std::shared_ptr<MstProcessor> mst_processor,
48 : std::shared_ptr<iroha::torii::StatusBus> status_bus,
49 : std::shared_ptr<shared_model::interface::TxStatusFactory>
50 : status_factory)
51 269 : : pcs_(std::move(pcs)),
52 269 : mst_processor_(std::move(mst_processor)),
53 269 : status_bus_(std::move(status_bus)),
54 269 : status_factory_(std::move(status_factory)),
55 269 : log_(logger::log("TxProcessor")) {
56 : // process stateful validation results
57 269 : pcs_->on_verified_proposal().subscribe(
58 : [this](std::shared_ptr<validation::VerifiedProposalAndErrors>
59 : proposal_and_errors) {
60 : // notify about failed txs
61 714 : const auto &errors = proposal_and_errors->rejected_transactions;
62 714 : std::lock_guard<std::mutex> lock(notifier_mutex_);
63 774 : for (const auto &tx_error : errors) {
64 60 : log_->info(composeErrorMessage(tx_error));
65 60 : this->publishStatus(TxStatusType::kStatefulFailed,
66 60 : tx_error.first,
67 60 : tx_error.second);
68 : }
69 : // notify about success txs
70 1396 : for (const auto &successful_tx :
71 714 : proposal_and_errors->verified_proposal->transactions()) {
72 682 : log_->info("on stateful validation success: {}",
73 682 : successful_tx.hash().hex());
74 682 : this->publishStatus(TxStatusType::kStatefulValid,
75 682 : successful_tx.hash());
76 : }
77 714 : });
78 :
79 : // commit transactions
80 269 : pcs_->on_commit().subscribe(
81 : [this](synchronizer::SynchronizationEvent sync_event) {
82 : sync_event.synced_blocks.subscribe(
83 : // on next
84 : [this](auto model_block) {
85 709 : current_txs_hashes_.reserve(
86 709 : model_block->transactions().size());
87 709 : std::transform(model_block->transactions().begin(),
88 709 : model_block->transactions().end(),
89 709 : std::back_inserter(current_txs_hashes_),
90 : [](const auto &tx) { return tx.hash(); });
91 709 : },
92 : // on complete
93 : [this] {
94 708 : if (current_txs_hashes_.empty()) {
95 51 : log_->info("there are no transactions to be committed");
96 51 : } else {
97 657 : std::lock_guard<std::mutex> lock(notifier_mutex_);
98 1334 : for (const auto &tx_hash : current_txs_hashes_) {
99 677 : log_->info("on commit committed: {}", tx_hash.hex());
100 677 : this->publishStatus(TxStatusType::kCommitted, tx_hash);
101 : }
102 : current_txs_hashes_.clear();
103 657 : }
104 708 : });
105 709 : });
106 :
107 : mst_processor_->onStateUpdate().subscribe([this](auto &&state) {
108 8 : log_->info("MST state updated");
109 16 : for (auto &&batch : state->getBatches()) {
110 16 : for (auto &&tx : batch->transactions()) {
111 8 : this->publishStatus(TxStatusType::kMstPending, tx->hash());
112 : }
113 : }
114 8 : });
115 : mst_processor_->onPreparedBatches().subscribe([this](auto &&batch) {
116 3 : log_->info("MST batch prepared");
117 3 : this->publishEnoughSignaturesStatus(batch->transactions());
118 3 : this->pcs_->propagate_batch(batch);
119 3 : });
120 : mst_processor_->onExpiredBatches().subscribe([this](auto &&batch) {
121 1 : log_->info("MST batch {} is expired", batch->reducedHash().toString());
122 2 : for (auto &&tx : batch->transactions()) {
123 1 : this->publishStatus(TxStatusType::kMstExpired, tx->hash());
124 : }
125 1 : });
126 269 : }
127 :
128 : void TransactionProcessorImpl::batchHandle(
129 : std::shared_ptr<shared_model::interface::TransactionBatch>
130 : transaction_batch) const {
131 757 : log_->info("handle batch");
132 757 : if (transaction_batch->hasAllSignatures()) {
133 745 : log_->info("propagating batch to PCS");
134 745 : this->publishEnoughSignaturesStatus(transaction_batch->transactions());
135 745 : pcs_->propagate_batch(transaction_batch);
136 745 : } else {
137 12 : log_->info("propagating batch to MST");
138 12 : mst_processor_->propagateBatch(transaction_batch);
139 : }
140 757 : }
141 :
142 : void TransactionProcessorImpl::publishStatus(
143 : TxStatusType tx_status,
144 : const shared_model::crypto::Hash &hash,
145 : const validation::CommandError &cmd_error) const {
146 2185 : auto tx_error = cmd_error.name.empty()
147 2125 : ? shared_model::interface::TxStatusFactory::TransactionError{}
148 60 : : shared_model::interface::TxStatusFactory::TransactionError{
149 60 : cmd_error.name, cmd_error.index, cmd_error.error_code};
150 2185 : switch (tx_status) {
151 : case TxStatusType::kStatelessFailed: {
152 0 : status_bus_->publish(
153 0 : status_factory_->makeStatelessFail(hash, tx_error));
154 0 : return;
155 : };
156 : case TxStatusType::kStatelessValid: {
157 0 : status_bus_->publish(
158 0 : status_factory_->makeStatelessValid(hash, tx_error));
159 0 : return;
160 : };
161 : case TxStatusType::kStatefulFailed: {
162 60 : status_bus_->publish(
163 60 : status_factory_->makeStatefulFail(hash, tx_error));
164 60 : return;
165 : };
166 : case TxStatusType::kStatefulValid: {
167 682 : status_bus_->publish(
168 682 : status_factory_->makeStatefulValid(hash, tx_error));
169 682 : return;
170 : };
171 : case TxStatusType::kCommitted: {
172 677 : status_bus_->publish(status_factory_->makeCommitted(hash, tx_error));
173 677 : return;
174 : };
175 : case TxStatusType::kMstExpired: {
176 1 : status_bus_->publish(status_factory_->makeMstExpired(hash, tx_error));
177 1 : return;
178 : };
179 : case TxStatusType::kNotReceived: {
180 0 : status_bus_->publish(
181 0 : status_factory_->makeNotReceived(hash, tx_error));
182 0 : return;
183 : };
184 : case TxStatusType::kMstPending: {
185 8 : status_bus_->publish(status_factory_->makeMstPending(hash, tx_error));
186 8 : return;
187 : };
188 : case TxStatusType::kEnoughSignaturesCollected: {
189 757 : status_bus_->publish(
190 757 : status_factory_->makeEnoughSignaturesCollected(hash, tx_error));
191 757 : return;
192 : };
193 : }
194 2185 : }
195 :
196 : void TransactionProcessorImpl::publishEnoughSignaturesStatus(
197 : const shared_model::interface::types::SharedTxsCollectionType &txs)
198 : const {
199 1505 : for (const auto &tx : txs) {
200 757 : this->publishStatus(TxStatusType::kEnoughSignaturesCollected,
201 757 : tx->hash());
202 : }
203 748 : }
204 : } // namespace torii
205 : } // namespace iroha
|