Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. 2017 All Rights Reserved.
3 : * http://soramitsu.co.jp
4 : *
5 : * Licensed under the Apache License, Version 2.0 (the "License");
6 : * you may not use this file except in compliance with the License.
7 : * You may obtain a copy of the License at
8 : *
9 : * http://www.apache.org/licenses/LICENSE-2.0
10 : *
11 : * Unless required by applicable law or agreed to in writing, software
12 : * distributed under the License is distributed on an "AS IS" BASIS,
13 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : * See the License for the specific language governing permissions and
15 : * limitations under the License.
16 : */
17 :
18 : #ifndef IROHA_ASYNC_GRPC_CLIENT_HPP
19 : #define IROHA_ASYNC_GRPC_CLIENT_HPP
20 :
21 : #include <thread>
22 :
23 : #include <google/protobuf/empty.pb.h>
24 : #include <grpc++/grpc++.h>
25 : #include <grpcpp/impl/codegen/async_unary_call.h>
26 : #include "logger/logger.hpp"
27 :
28 : namespace iroha {
29 : namespace network {
30 :
31 : /**
32 : * Asynchronous gRPC client which does no processing of server responses
33 : * @tparam Response type of server response
34 : */
35 : template <typename Response>
36 : class AsyncGrpcClient {
37 : public:
38 : AsyncGrpcClient()
39 499 : : thread_(&AsyncGrpcClient::asyncCompleteRpc, this),
40 499 : log_(logger::log("AsyncGrpcClient")) {}
41 :
42 : /**
43 : * Listen to gRPC server responses
44 : */
45 : void asyncCompleteRpc() {
46 : void *got_tag;
47 499 : auto ok = false;
48 3356 : while (cq_.Next(&got_tag, &ok)) {
49 2857 : auto call = static_cast<AsyncClientCall *>(got_tag);
50 2857 : if (not call->status.ok()) {
51 2 : log_->warn("RPC failed: {}", call->status.error_message());
52 2 : }
53 2857 : delete call;
54 : }
55 499 : }
56 :
57 : ~AsyncGrpcClient() {
58 499 : cq_.Shutdown();
59 499 : if (thread_.joinable()) {
60 499 : thread_.join();
61 499 : }
62 499 : }
63 :
64 : grpc::CompletionQueue cq_;
65 : std::thread thread_;
66 : logger::Logger log_;
67 :
68 : /**
69 : * State and data information of gRPC call
70 : */
71 : struct AsyncClientCall {
72 : Response reply;
73 :
74 : grpc::ClientContext context;
75 :
76 : grpc::Status status;
77 :
78 : std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<Response>>
79 : response_reader;
80 : };
81 :
82 : /**
83 : * Universal method to perform all needed sends
84 : * @tparam lambda which must return unique pointer to
85 : * ClientAsyncResponseReader<Response> object
86 : */
87 : template <typename F>
88 : void Call(F &&lambda) {
89 2858 : auto call = new AsyncClientCall;
90 2858 : call->response_reader = lambda(&call->context, &cq_);
91 2858 : call->response_reader->Finish(&call->reply, &call->status, call);
92 2858 : }
93 : };
94 : } // namespace network
95 : } // namespace iroha
96 :
97 : #endif // IROHA_ASYNC_GRPC_CLIENT_HPP
|