Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "ordering/impl/on_demand_connection_manager.hpp"
7 :
8 : #include <boost/range/combine.hpp>
9 : #include "interfaces/iroha_internal/proposal.hpp"
10 :
11 : using namespace iroha::ordering;
12 :
13 : OnDemandConnectionManager::OnDemandConnectionManager(
14 : std::shared_ptr<transport::OdOsNotificationFactory> factory,
15 : CurrentPeers initial_peers,
16 : rxcpp::observable<CurrentPeers> peers)
17 4 : : factory_(std::move(factory)),
18 : subscription_(peers.subscribe([this](const auto &peers) {
19 : // exclusive lock
20 0 : std::lock_guard<std::shared_timed_mutex> lock(mutex_);
21 :
22 0 : this->initializeConnections(peers);
23 4 : })) {
24 : // using start_with(initial_peers) results in deadlock
25 4 : initializeConnections(initial_peers);
26 4 : }
27 :
28 : void OnDemandConnectionManager::onBatches(consensus::Round round,
29 : CollectionType batches) {
30 : // shared lock
31 1 : std::shared_lock<std::shared_timed_mutex> lock(mutex_);
32 :
33 : const PeerType types[] = {kCurrentRoundRejectConsumer,
34 : kNextRoundRejectConsumer,
35 : kNextRoundCommitConsumer};
36 : /*
37 : * Transactions are always sent to the round after the next round (+2)
38 : * There are 3 possibilities - next reject in the current round, first reject
39 : * in the next round, and first commit in the round after the next round
40 : * This can be visualised as a diagram, where:
41 : * o - current round, x - next round, v - target round
42 : *
43 : * 0 1 2
44 : * 0 o x v
45 : * 1 x v .
46 : * 2 v . .
47 : */
48 1 : const consensus::Round rounds[] = {
49 1 : {round.block_round, round.reject_round + 2},
50 1 : {round.block_round + 1, 2},
51 1 : {round.block_round + 2, 1}};
52 :
53 4 : for (auto &&pair : boost::combine(types, rounds)) {
54 3 : connections_.peers[boost::get<0>(pair)]->onBatches(boost::get<1>(pair),
55 3 : batches);
56 : }
57 1 : }
58 :
59 : boost::optional<OnDemandConnectionManager::ProposalType>
60 : OnDemandConnectionManager::onRequestProposal(consensus::Round round) {
61 : // shared lock
62 2 : std::shared_lock<std::shared_timed_mutex> lock(mutex_);
63 :
64 2 : return connections_.peers[kIssuer]->onRequestProposal(round);
65 2 : }
66 :
67 : void OnDemandConnectionManager::initializeConnections(
68 : const CurrentPeers &peers) {
69 : auto create_assign = [this](auto &ptr, auto &peer) {
70 16 : ptr = factory_->create(*peer);
71 16 : };
72 :
73 20 : for (auto &&pair : boost::combine(connections_.peers, peers.peers)) {
74 16 : create_assign(boost::get<0>(pair), boost::get<1>(pair));
75 : }
76 4 : }
|