Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "torii/impl/command_service_impl.hpp"
7 :
8 : #include <thread>
9 :
10 : #include "ametsuchi/block_query.hpp"
11 : #include "common/byteutils.hpp"
12 : #include "common/is_any.hpp"
13 : #include "common/visitor.hpp"
14 : #include "interfaces/iroha_internal/transaction_batch.hpp"
15 :
16 : namespace torii {
17 :
18 : CommandServiceImpl::CommandServiceImpl(
19 : std::shared_ptr<iroha::torii::TransactionProcessor> tx_processor,
20 : std::shared_ptr<iroha::ametsuchi::Storage> storage,
21 : std::shared_ptr<iroha::torii::StatusBus> status_bus,
22 : std::shared_ptr<shared_model::interface::TxStatusFactory> status_factory)
23 261 : : tx_processor_(std::move(tx_processor)),
24 261 : storage_(std::move(storage)),
25 261 : status_bus_(std::move(status_bus)),
26 261 : cache_(std::make_shared<CacheType>()),
27 261 : status_factory_(std::move(status_factory)),
28 261 : log_(logger::log("CommandServiceImpl")) {
29 : // Notifier for all clients
30 : status_bus_->statuses().subscribe([this](auto response) {
31 : // find response for this tx in cache; if status of received response
32 : // isn't "greater" than cached one, dismiss received one
33 2944 : auto tx_hash = response->transactionHash();
34 2944 : auto cached_tx_state = cache_->findItem(tx_hash);
35 2944 : if (cached_tx_state
36 2944 : and response->comparePriorities(**cached_tx_state)
37 2145 : != shared_model::interface::TransactionResponse::
38 : PrioritiesComparisonResult::kGreater) {
39 747 : return;
40 : }
41 2197 : cache_->addItem(tx_hash, response);
42 2944 : });
43 261 : }
44 :
45 : void CommandServiceImpl::handleTransactionBatch(
46 : std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
47 739 : processBatch(batch);
48 739 : }
49 :
50 : std::shared_ptr<shared_model::interface::TransactionResponse>
51 : CommandServiceImpl::getStatus(const shared_model::crypto::Hash &request) {
52 813 : auto cached = cache_->findItem(request);
53 813 : if (cached) {
54 797 : return cached.value();
55 : }
56 :
57 16 : auto block_query = storage_->getBlockQuery();
58 16 : if (not block_query) {
59 : // TODO andrei 30.11.18 IR-51 Handle database error
60 0 : log_->warn("Could not create block query. Tx: {}", request.hex());
61 0 : return status_factory_->makeNotReceived(request);
62 : }
63 :
64 16 : auto status = block_query->checkTxPresence(request);
65 16 : if (not status) {
66 : // TODO andrei 30.11.18 IR-51 Handle database error
67 11 : log_->warn("Check tx presence database error. Tx: {}", request.hex());
68 11 : return status_factory_->makeNotReceived(request);
69 : }
70 :
71 5 : return iroha::visit_in_place(
72 5 : *status,
73 : [this,
74 5 : &request](const iroha::ametsuchi::tx_cache_status_responses::Missing &)
75 : -> std::shared_ptr<shared_model::interface::TransactionResponse> {
76 5 : log_->warn("Asked non-existing tx: {}", request.hex());
77 5 : return status_factory_->makeNotReceived(request);
78 0 : },
79 : [this, &request](const auto &) {
80 : std::shared_ptr<shared_model::interface::TransactionResponse>
81 0 : response = status_factory_->makeCommitted(request);
82 : cache_->addItem(request, response);
83 0 : return response;
84 0 : });
85 813 : }
86 :
87 : /**
88 : * Statuses considered final for streaming. Observable stops value emission
89 : * after receiving a value of one of the following types
90 : * @tparam T concrete response type
91 : */
92 : template <typename T>
93 : constexpr bool FinalStatusValue =
94 : iroha::is_any<std::decay_t<T>,
95 : shared_model::interface::StatelessFailedTxResponse,
96 : shared_model::interface::StatefulFailedTxResponse,
97 : shared_model::interface::CommittedTxResponse,
98 : shared_model::interface::MstExpiredResponse>::value;
99 :
100 : rxcpp::observable<
101 : std::shared_ptr<shared_model::interface::TransactionResponse>>
102 : CommandServiceImpl::getStatusStream(const shared_model::crypto::Hash &hash) {
103 : using ResponsePtrType =
104 : std::shared_ptr<shared_model::interface::TransactionResponse>;
105 : auto initial_status = cache_->findItem(hash).value_or([&] {
106 2 : log_->debug("tx is not received: {}", hash.toString());
107 2 : return status_factory_->makeNotReceived(hash);
108 0 : }());
109 2 : return status_bus_
110 2 : ->statuses()
111 : // prepend initial status
112 : .start_with(initial_status)
113 : // select statuses with requested hash
114 2 : .filter(
115 : [&](auto response) { return response->transactionHash() == hash; })
116 : // successfully complete the observable if final status is received.
117 : // final status is included in the observable
118 : .template lift<ResponsePtrType>([](rxcpp::subscriber<ResponsePtrType>
119 : dest) {
120 2 : return rxcpp::make_subscriber<ResponsePtrType>(
121 : dest, [=](ResponsePtrType response) {
122 3 : dest.on_next(response);
123 3 : iroha::visit_in_place(
124 3 : response->get(),
125 : [dest](const auto &resp)
126 : -> std::enable_if_t<FinalStatusValue<decltype(resp)>> {
127 1 : dest.on_completed();
128 1 : },
129 : [](const auto &resp)
130 : -> std::enable_if_t<
131 2 : not FinalStatusValue<decltype(resp)>>{});
132 3 : });
133 0 : });
134 2 : }
135 :
136 : void CommandServiceImpl::pushStatus(
137 : const std::string &who,
138 : std::shared_ptr<shared_model::interface::TransactionResponse> response) {
139 744 : log_->debug("{}: adding item to cache: {}", who, response->toString());
140 744 : status_bus_->publish(response);
141 744 : }
142 :
143 : void CommandServiceImpl::processBatch(
144 : std::shared_ptr<shared_model::interface::TransactionBatch> batch) {
145 739 : tx_processor_->batchHandle(batch);
146 739 : const auto &txs = batch->transactions();
147 : std::for_each(txs.begin(), txs.end(), [this](const auto &tx) {
148 744 : const auto &tx_hash = tx->hash();
149 744 : auto found = cache_->findItem(tx_hash);
150 : // StatlessValid status goes only after EnoughSignaturesCollectedResponse
151 : // So doesn't skip publishing status after it
152 744 : if (found
153 744 : and iroha::visit_in_place(
154 701 : found.value()->get(),
155 : [](const shared_model::interface::
156 697 : EnoughSignaturesCollectedResponse &) { return false; },
157 : [](auto &) { return true; })
158 701 : and tx->quorum() < 2) {
159 0 : log_->warn("Found transaction {} in cache, ignoring", tx_hash.hex());
160 0 : return;
161 : }
162 :
163 744 : this->pushStatus("ToriiBatchProcessor",
164 744 : status_factory_->makeStatelessValid(tx_hash));
165 744 : });
166 739 : }
167 :
168 : } // namespace torii
|