Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "torii/query_service.hpp"
7 :
8 : #include "backend/protobuf/query_responses/proto_block_query_response.hpp"
9 : #include "backend/protobuf/query_responses/proto_query_response.hpp"
10 : #include "cryptography/default_hash_provider.hpp"
11 : #include "interfaces/iroha_internal/abstract_transport_factory.hpp"
12 : #include "validators/default_validator.hpp"
13 :
14 : namespace torii {
15 :
16 : QueryService::QueryService(
17 : std::shared_ptr<iroha::torii::QueryProcessor> query_processor,
18 : std::shared_ptr<QueryFactoryType> query_factory)
19 268 : : query_processor_{std::move(query_processor)},
20 268 : query_factory_{std::move(query_factory)},
21 268 : log_{logger::log("Query Service")} {}
22 :
23 : void QueryService::Find(iroha::protocol::Query const &request,
24 : iroha::protocol::QueryResponse &response) {
25 167 : shared_model::crypto::Hash hash;
26 167 : auto blobPayload = shared_model::proto::makeBlob(request.payload());
27 167 : hash = shared_model::crypto::DefaultHashProvider::makeHash(blobPayload);
28 :
29 167 : if (cache_.findItem(hash)) {
30 : // Query was already processed
31 1 : response.mutable_error_response()->set_reason(
32 : iroha::protocol::ErrorResponse::STATELESS_INVALID);
33 1 : return;
34 : }
35 :
36 166 : query_factory_->build(request).match(
37 : [this, &hash, &response](
38 : const iroha::expected::Value<
39 : std::unique_ptr<shared_model::interface::Query>> &query) {
40 : // Send query to iroha
41 143 : response = static_cast<shared_model::proto::QueryResponse &>(
42 132 : *query_processor_->queryHandle(*query.value))
43 142 : .getTransport();
44 143 : cache_.addItem(hash, response);
45 143 : },
46 : [&hash, &response](
47 : const iroha::expected::Error<QueryFactoryType::Error> &error) {
48 23 : response.set_query_hash(hash.hex());
49 23 : response.mutable_error_response()->set_reason(
50 : iroha::protocol::ErrorResponse::STATELESS_INVALID);
51 23 : response.mutable_error_response()->set_message(
52 23 : std::move(error.error.error));
53 23 : });
54 167 : }
55 :
56 : grpc::Status QueryService::Find(grpc::ServerContext *context,
57 : const iroha::protocol::Query *request,
58 : iroha::protocol::QueryResponse *response) {
59 164 : Find(*request, *response);
60 164 : return grpc::Status::OK;
61 : }
62 :
63 : grpc::Status QueryService::FetchCommits(
64 : grpc::ServerContext *context,
65 : const iroha::protocol::BlocksQuery *request,
66 : grpc::ServerWriter<iroha::protocol::BlockQueryResponse> *writer) {
67 2 : log_->debug("Fetching commits");
68 2 : shared_model::proto::TransportBuilder<
69 : shared_model::proto::BlocksQuery,
70 : shared_model::validation::DefaultSignedBlocksQueryValidator>()
71 2 : .build(*request)
72 2 : .match(
73 : [this, context, request, writer](
74 : const iroha::expected::Value<shared_model::proto::BlocksQuery>
75 : &query) {
76 1 : rxcpp::composite_subscription sub;
77 1 : query_processor_->blocksQueryHandle(query.value)
78 1 : .as_blocking()
79 1 : .subscribe(
80 : sub,
81 : [this, context, &sub, request, writer](
82 : const std::shared_ptr<
83 : shared_model::interface::BlockQueryResponse>
84 : response) {
85 1 : if (context->IsCancelled()) {
86 0 : log_->debug("Unsubscribed");
87 0 : sub.unsubscribe();
88 0 : } else {
89 1 : iroha::visit_in_place(
90 1 : response->get(),
91 : [this, writer, request](
92 : const shared_model::interface::BlockResponse
93 : &block_response) {
94 1 : log_->debug(
95 : "{} receives committed block",
96 1 : request->meta().creator_account_id());
97 1 : auto proto_block_response = static_cast<
98 : const shared_model::proto::BlockResponse &>(
99 1 : block_response);
100 1 : writer->Write(
101 1 : proto_block_response.getTransport());
102 : },
103 : [this, writer, request](
104 : const shared_model::interface::
105 : BlockErrorResponse
106 : &block_error_response) {
107 0 : log_->debug(
108 : "{} received error with message: {}",
109 0 : request->meta().creator_account_id(),
110 0 : block_error_response.message());
111 : auto proto_block_error_response =
112 : static_cast<const shared_model::proto::
113 : BlockErrorResponse &>(
114 0 : block_error_response);
115 0 : writer->WriteLast(
116 0 : proto_block_error_response.getTransport(),
117 0 : grpc::WriteOptions());
118 0 : });
119 : }
120 1 : });
121 1 : },
122 : [this, writer](const auto &error) {
123 1 : log_->debug("Stateless invalid: {}", error.error);
124 1 : iroha::protocol::BlockQueryResponse response;
125 1 : response.mutable_block_error_response()->set_message(
126 1 : std::move(error.error));
127 1 : writer->WriteLast(response, grpc::WriteOptions());
128 1 : });
129 :
130 2 : return grpc::Status::OK;
131 0 : }
132 :
133 : } // namespace torii
|