Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "torii/processor/query_processor_impl.hpp"
7 :
8 : #include <boost/range/size.hpp>
9 :
10 : #include "ametsuchi/wsv_query.hpp"
11 : #include "common/bind.hpp"
12 : #include "interfaces/queries/blocks_query.hpp"
13 : #include "interfaces/queries/query.hpp"
14 : #include "interfaces/query_responses/block_query_response.hpp"
15 : #include "interfaces/query_responses/query_response.hpp"
16 : #include "validation/utils.hpp"
17 :
18 : namespace iroha {
19 : namespace torii {
20 :
21 : QueryProcessorImpl::QueryProcessorImpl(
22 : std::shared_ptr<ametsuchi::Storage> storage,
23 : std::shared_ptr<ametsuchi::QueryExecutorFactory> qry_exec,
24 : std::shared_ptr<iroha::PendingTransactionStorage> pending_transactions,
25 : std::shared_ptr<shared_model::interface::QueryResponseFactory>
26 : response_factory)
27 268 : : storage_{std::move(storage)},
28 268 : qry_exec_{std::move(qry_exec)},
29 268 : pending_transactions_{std::move(pending_transactions)},
30 268 : response_factory_{std::move(response_factory)},
31 268 : log_{logger::log("QueryProcessorImpl")} {
32 268 : storage_->on_commit().subscribe(
33 : [this](std::shared_ptr<shared_model::interface::Block> block) {
34 : auto block_response =
35 714 : response_factory_->createBlockQueryResponse(clone(*block));
36 714 : blocks_query_subject_.get_subscriber().on_next(
37 714 : std::move(block_response));
38 714 : });
39 268 : }
40 :
41 : template <class Q>
42 : bool QueryProcessorImpl::checkSignatories(const Q &qry) {
43 143 : const auto &wsv_query = storage_->getWsvQuery();
44 :
45 143 : auto signatories = wsv_query->getSignatories(qry.creatorAccountId());
46 142 : const auto &sig = qry.signatures();
47 :
48 143 : return boost::size(sig) == 1
49 : and signatories | [&sig](const auto &signatories) {
50 143 : return validation::signaturesSubset(sig, signatories);
51 0 : };
52 143 : }
53 :
54 : template bool QueryProcessorImpl::checkSignatories<
55 : shared_model::interface::Query>(const shared_model::interface::Query &);
56 : template bool
57 : QueryProcessorImpl::checkSignatories<shared_model::interface::BlocksQuery>(
58 : const shared_model::interface::BlocksQuery &);
59 :
60 : std::unique_ptr<shared_model::interface::QueryResponse>
61 : QueryProcessorImpl::queryHandle(const shared_model::interface::Query &qry) {
62 143 : if (not checkSignatories(qry)) {
63 4 : return response_factory_->createErrorQueryResponse(
64 : shared_model::interface::QueryResponseFactory::ErrorQueryType::
65 : kStatefulFailed,
66 4 : "query signatories did not pass validation",
67 4 : qry.hash());
68 : }
69 :
70 139 : auto executor = qry_exec_->createQueryExecutor(pending_transactions_,
71 138 : response_factory_);
72 139 : if (not executor) {
73 0 : log_->error("Cannot create query executor");
74 0 : return nullptr;
75 : }
76 :
77 139 : return executor.value()->validateAndExecute(qry);
78 143 : }
79 :
80 : rxcpp::observable<
81 : std::shared_ptr<shared_model::interface::BlockQueryResponse>>
82 : QueryProcessorImpl::blocksQueryHandle(
83 : const shared_model::interface::BlocksQuery &qry) {
84 2 : if (not checkSignatories(qry)) {
85 0 : std::shared_ptr<shared_model::interface::BlockQueryResponse> response =
86 0 : response_factory_->createBlockQueryResponse(
87 0 : "query signatories did not pass validation");
88 0 : return rxcpp::observable<>::just(std::move(response));
89 0 : }
90 :
91 2 : auto exec = qry_exec_->createQueryExecutor(pending_transactions_,
92 : response_factory_);
93 : if (not exec or not(exec | [&qry](const auto &executor) {
94 2 : return executor->validate(qry);
95 : })) {
96 1 : std::shared_ptr<shared_model::interface::BlockQueryResponse> response =
97 1 : response_factory_->createBlockQueryResponse("stateful invalid");
98 1 : return rxcpp::observable<>::just(response);
99 1 : }
100 1 : return blocks_query_subject_.get_observable();
101 2 : }
102 :
103 : } // namespace torii
104 : } // namespace iroha
|