Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include <utility>
7 :
8 : #include "multi_sig_transactions/mst_processor_impl.hpp"
9 :
10 : namespace iroha {
11 :
12 : FairMstProcessor::FairMstProcessor(
13 : std::shared_ptr<iroha::network::MstTransport> transport,
14 : std::shared_ptr<MstStorage> storage,
15 : std::shared_ptr<PropagationStrategy> strategy,
16 : std::shared_ptr<MstTimeProvider> time_provider)
17 253 : : MstProcessor(),
18 253 : transport_(std::move(transport)),
19 253 : storage_(std::move(storage)),
20 253 : strategy_(std::move(strategy)),
21 253 : time_provider_(std::move(time_provider)),
22 253 : propagation_subscriber_(strategy_->emitter().subscribe(
23 : [this](auto data) { this->onPropagate(data); })) {
24 253 : log_ = logger::log("FairMstProcessor");
25 253 : }
26 :
27 : FairMstProcessor::~FairMstProcessor() {
28 253 : propagation_subscriber_.unsubscribe();
29 253 : }
30 :
31 : // -------------------------| MstProcessor override |-------------------------
32 :
33 : auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch)
34 : -> decltype(propagateBatch(batch)) {
35 21 : auto state_update = storage_->updateOwnState(batch);
36 21 : completedBatchesNotify(*state_update.completed_state_);
37 21 : updatedBatchesNotify(*state_update.updated_state_);
38 21 : expiredBatchesNotify(
39 21 : storage_->getExpiredTransactions(time_provider_->getCurrentTime()));
40 21 : }
41 :
42 : auto FairMstProcessor::onStateUpdateImpl() const
43 : -> decltype(onStateUpdate()) {
44 496 : return state_subject_.get_observable();
45 0 : }
46 :
47 : auto FairMstProcessor::onPreparedBatchesImpl() const
48 : -> decltype(onPreparedBatches()) {
49 496 : return batches_subject_.get_observable();
50 0 : }
51 :
52 : auto FairMstProcessor::onExpiredBatchesImpl() const
53 : -> decltype(onExpiredBatches()) {
54 496 : return expired_subject_.get_observable();
55 0 : }
56 :
57 : // TODO [IR-1687] Akvinikym 10.09.18: three methods below should be one
58 : void FairMstProcessor::completedBatchesNotify(ConstRefState state) const {
59 22 : if (not state.isEmpty()) {
60 4 : auto completed_batches = state.getBatches();
61 4 : std::for_each(completed_batches.begin(),
62 4 : completed_batches.end(),
63 : [this](const auto &batch) {
64 4 : batches_subject_.get_subscriber().on_next(batch);
65 4 : });
66 4 : }
67 22 : }
68 :
69 : void FairMstProcessor::updatedBatchesNotify(ConstRefState state) const {
70 22 : if (not state.isEmpty()) {
71 17 : state_subject_.get_subscriber().on_next(
72 17 : std::make_shared<MstState>(state));
73 17 : }
74 22 : }
75 :
76 : void FairMstProcessor::expiredBatchesNotify(ConstRefState state) const {
77 22 : if (not state.isEmpty()) {
78 1 : auto expired_batches = state.getBatches();
79 1 : std::for_each(expired_batches.begin(),
80 1 : expired_batches.end(),
81 : [this](const auto &batch) {
82 : expired_subject_.get_subscriber().on_next(batch);
83 1 : });
84 1 : }
85 22 : }
86 :
87 : // -------------------| MstTransportNotification override |-------------------
88 :
89 : void FairMstProcessor::onNewState(
90 : const shared_model::crypto::PublicKey &from,
91 : ConstRefState new_state) {
92 : log_->info("Applying new state");
93 1 : auto current_time = time_provider_->getCurrentTime();
94 :
95 1 : auto state_update = storage_->apply(from, new_state);
96 :
97 : // updated batches
98 1 : updatedBatchesNotify(*state_update.updated_state_);
99 1 : log_->info("New batches size: {}",
100 1 : state_update.updated_state_->getBatches().size());
101 :
102 : // completed batches
103 1 : completedBatchesNotify(*state_update.completed_state_);
104 :
105 : // expired batches
106 1 : expiredBatchesNotify(storage_->getDiffState(from, current_time));
107 1 : }
108 :
109 : // -----------------------------| private api |-----------------------------
110 :
111 : void FairMstProcessor::onPropagate(
112 : const PropagationStrategy::PropagationData &data) {
113 9 : auto current_time = time_provider_->getCurrentTime();
114 9 : auto size = data.size();
115 9 : std::for_each(data.begin(),
116 9 : data.end(),
117 : [this, ¤t_time, size](const auto &dst_peer) {
118 17 : auto diff = storage_->getDiffState(dst_peer->pubkey(),
119 17 : current_time);
120 17 : if (not diff.isEmpty()) {
121 4 : log_->info("Propagate new data[{}]", size);
122 4 : transport_->sendState(*dst_peer, diff);
123 4 : }
124 17 : });
125 9 : }
126 :
127 : } // namespace iroha
|