Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. 2017 All Rights Reserved.
3 : * http://soramitsu.co.jp
4 : *
5 : * Licensed under the Apache License, Version 2.0 (the "License");
6 : * you may not use this file except in compliance with the License.
7 : * You may obtain a copy of the License at
8 : *
9 : * http://www.apache.org/licenses/LICENSE-2.0
10 : *
11 : * Unless required by applicable law or agreed to in writing, software
12 : * distributed under the License is distributed on an "AS IS" BASIS,
13 : * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 : * See the License for the specific language governing permissions and
15 : * limitations under the License.
16 : */
17 :
18 : #include "consensus/yac/transport/impl/network_impl.hpp"
19 :
20 : #include <grpc++/grpc++.h>
21 : #include <memory>
22 :
23 : #include "consensus/yac/messages.hpp"
24 : #include "consensus/yac/storage/yac_common.hpp"
25 : #include "consensus/yac/transport/yac_pb_converters.hpp"
26 : #include "interfaces/common_objects/peer.hpp"
27 : #include "logger/logger.hpp"
28 : #include "network/impl/grpc_channel_builder.hpp"
29 : #include "yac.pb.h"
30 :
31 : namespace iroha {
32 : namespace consensus {
33 : namespace yac {
34 : // ----------| Public API |----------
35 :
36 : NetworkImpl::NetworkImpl(
37 : std::shared_ptr<network::AsyncGrpcClient<google::protobuf::Empty>>
38 : async_call)
39 494 : : async_call_(async_call) {}
40 :
41 : void NetworkImpl::subscribe(
42 : std::shared_ptr<YacNetworkNotifications> handler) {
43 248 : handler_ = handler;
44 248 : }
45 :
46 : void NetworkImpl::sendState(const shared_model::interface::Peer &to,
47 : const std::vector<VoteMessage> &state) {
48 1425 : createPeerConnection(to);
49 :
50 1425 : proto::State request;
51 2854 : for (const auto &vote : state) {
52 1429 : auto pb_vote = request.add_votes();
53 1429 : *pb_vote = PbConverters::serializeVote(vote);
54 : }
55 :
56 : async_call_->Call([&](auto context, auto cq) {
57 1425 : return peers_.at(to.address())->AsyncSendState(context, request, cq);
58 : });
59 :
60 1425 : async_call_->log_->info(
61 1425 : "Send votes bundle[size={}] to {}", state.size(), to.address());
62 1425 : }
63 :
64 : grpc::Status NetworkImpl::SendState(
65 : ::grpc::ServerContext *context,
66 : const ::iroha::consensus::yac::proto::State *request,
67 : ::google::protobuf::Empty *response) {
68 1425 : std::vector<VoteMessage> state;
69 2854 : for (const auto &pb_vote : request->votes()) {
70 1429 : auto vote = *PbConverters::deserializeVote(pb_vote);
71 1428 : state.push_back(vote);
72 1429 : }
73 1425 : if (not sameKeys(state)) {
74 0 : async_call_->log_->info(
75 : "Votes are stateless invalid: proposals are different, or empty "
76 : "collection");
77 0 : return grpc::Status::CANCELLED;
78 : }
79 :
80 1425 : async_call_->log_->info(
81 1425 : "Receive votes[size={}] from {}", state.size(), context->peer());
82 :
83 1425 : handler_.lock()->onState(state);
84 1425 : return grpc::Status::OK;
85 1425 : }
86 :
87 : void NetworkImpl::createPeerConnection(
88 : const shared_model::interface::Peer &peer) {
89 1425 : if (peers_.count(peer.address()) == 0) {
90 235 : peers_[peer.address()] =
91 235 : network::createClient<proto::Yac>(peer.address());
92 : }
93 1425 : }
94 :
95 : } // namespace yac
96 : } // namespace consensus
97 : } // namespace iroha
|