LCOV - code coverage report
Current view: top level - irohad/torii/impl - query_service.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 49 61 80.3 %
Date: 2018-12-05 17:11:35 Functions: 15 16 93.8 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "torii/query_service.hpp"
       7             : 
       8             : #include "backend/protobuf/query_responses/proto_block_query_response.hpp"
       9             : #include "backend/protobuf/query_responses/proto_query_response.hpp"
      10             : #include "cryptography/default_hash_provider.hpp"
      11             : #include "interfaces/iroha_internal/abstract_transport_factory.hpp"
      12             : #include "validators/default_validator.hpp"
      13             : 
      14             : namespace torii {
      15             : 
      16             :   QueryService::QueryService(
      17             :       std::shared_ptr<iroha::torii::QueryProcessor> query_processor,
      18             :       std::shared_ptr<QueryFactoryType> query_factory)
      19         268 :       : query_processor_{std::move(query_processor)},
      20         268 :         query_factory_{std::move(query_factory)},
      21         268 :         log_{logger::log("Query Service")} {}
      22             : 
      23             :   void QueryService::Find(iroha::protocol::Query const &request,
      24             :                           iroha::protocol::QueryResponse &response) {
      25         167 :     shared_model::crypto::Hash hash;
      26         167 :     auto blobPayload = shared_model::proto::makeBlob(request.payload());
      27         167 :     hash = shared_model::crypto::DefaultHashProvider::makeHash(blobPayload);
      28             : 
      29         167 :     if (cache_.findItem(hash)) {
      30             :       // Query was already processed
      31           1 :       response.mutable_error_response()->set_reason(
      32             :           iroha::protocol::ErrorResponse::STATELESS_INVALID);
      33           1 :       return;
      34             :     }
      35             : 
      36         166 :     query_factory_->build(request).match(
      37             :         [this, &hash, &response](
      38             :             const iroha::expected::Value<
      39             :                 std::unique_ptr<shared_model::interface::Query>> &query) {
      40             :           // Send query to iroha
      41         143 :           response = static_cast<shared_model::proto::QueryResponse &>(
      42         132 :                          *query_processor_->queryHandle(*query.value))
      43         142 :                          .getTransport();
      44         143 :           cache_.addItem(hash, response);
      45         143 :         },
      46             :         [&hash, &response](
      47             :             const iroha::expected::Error<QueryFactoryType::Error> &error) {
      48          23 :           response.set_query_hash(hash.hex());
      49          23 :           response.mutable_error_response()->set_reason(
      50             :               iroha::protocol::ErrorResponse::STATELESS_INVALID);
      51          23 :           response.mutable_error_response()->set_message(
      52          23 :               std::move(error.error.error));
      53          23 :         });
      54         167 :   }
      55             : 
      56             :   grpc::Status QueryService::Find(grpc::ServerContext *context,
      57             :                                   const iroha::protocol::Query *request,
      58             :                                   iroha::protocol::QueryResponse *response) {
      59         164 :     Find(*request, *response);
      60         164 :     return grpc::Status::OK;
      61             :   }
      62             : 
      63             :   grpc::Status QueryService::FetchCommits(
      64             :       grpc::ServerContext *context,
      65             :       const iroha::protocol::BlocksQuery *request,
      66             :       grpc::ServerWriter<iroha::protocol::BlockQueryResponse> *writer) {
      67           2 :     log_->debug("Fetching commits");
      68           2 :     shared_model::proto::TransportBuilder<
      69             :         shared_model::proto::BlocksQuery,
      70             :         shared_model::validation::DefaultSignedBlocksQueryValidator>()
      71           2 :         .build(*request)
      72           2 :         .match(
      73             :             [this, context, request, writer](
      74             :                 const iroha::expected::Value<shared_model::proto::BlocksQuery>
      75             :                     &query) {
      76           1 :               rxcpp::composite_subscription sub;
      77           1 :               query_processor_->blocksQueryHandle(query.value)
      78           1 :                   .as_blocking()
      79           1 :                   .subscribe(
      80             :                       sub,
      81             :                       [this, context, &sub, request, writer](
      82             :                           const std::shared_ptr<
      83             :                               shared_model::interface::BlockQueryResponse>
      84             :                               response) {
      85           1 :                         if (context->IsCancelled()) {
      86           0 :                           log_->debug("Unsubscribed");
      87           0 :                           sub.unsubscribe();
      88           0 :                         } else {
      89           1 :                           iroha::visit_in_place(
      90           1 :                               response->get(),
      91             :                               [this, writer, request](
      92             :                                   const shared_model::interface::BlockResponse
      93             :                                       &block_response) {
      94           1 :                                 log_->debug(
      95             :                                     "{} receives committed block",
      96           1 :                                     request->meta().creator_account_id());
      97           1 :                                 auto proto_block_response = static_cast<
      98             :                                     const shared_model::proto::BlockResponse &>(
      99           1 :                                     block_response);
     100           1 :                                 writer->Write(
     101           1 :                                     proto_block_response.getTransport());
     102             :                               },
     103             :                               [this, writer, request](
     104             :                                   const shared_model::interface::
     105             :                                       BlockErrorResponse
     106             :                                           &block_error_response) {
     107           0 :                                 log_->debug(
     108             :                                     "{} received error with message: {}",
     109           0 :                                     request->meta().creator_account_id(),
     110           0 :                                     block_error_response.message());
     111             :                                 auto proto_block_error_response =
     112             :                                     static_cast<const shared_model::proto::
     113             :                                                     BlockErrorResponse &>(
     114           0 :                                         block_error_response);
     115           0 :                                 writer->WriteLast(
     116           0 :                                     proto_block_error_response.getTransport(),
     117           0 :                                     grpc::WriteOptions());
     118           0 :                               });
     119             :                         }
     120           1 :                       });
     121           1 :             },
     122             :             [this, writer](const auto &error) {
     123           1 :               log_->debug("Stateless invalid: {}", error.error);
     124           1 :               iroha::protocol::BlockQueryResponse response;
     125           1 :               response.mutable_block_error_response()->set_message(
     126           1 :                   std::move(error.error));
     127           1 :               writer->WriteLast(response, grpc::WriteOptions());
     128           1 :             });
     129             : 
     130           2 :     return grpc::Status::OK;
     131           0 :   }
     132             : 
     133             : }  // namespace torii

Generated by: LCOV version 1.13