Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "pending_txs_storage/impl/pending_txs_storage_impl.hpp"
7 :
8 : #include "interfaces/transaction.hpp"
9 : #include "multi_sig_transactions/state/mst_state.hpp"
10 :
11 : namespace iroha {
12 :
13 : PendingTransactionStorageImpl::PendingTransactionStorageImpl(
14 : StateObservable updated_batches,
15 : BatchObservable prepared_batch,
16 251 : BatchObservable expired_batch) {
17 251 : updated_batches_subscription_ =
18 : updated_batches.subscribe([this](const SharedState &batches) {
19 16 : this->updatedBatchesHandler(batches);
20 16 : });
21 251 : prepared_batch_subscription_ =
22 : prepared_batch.subscribe([this](const SharedBatch &preparedBatch) {
23 3 : this->removeBatch(preparedBatch);
24 3 : });
25 251 : expired_batch_subscription_ =
26 : expired_batch.subscribe([this](const SharedBatch &expiredBatch) {
27 1 : this->removeBatch(expiredBatch);
28 1 : });
29 251 : }
30 :
31 : PendingTransactionStorageImpl::~PendingTransactionStorageImpl() {
32 251 : updated_batches_subscription_.unsubscribe();
33 251 : prepared_batch_subscription_.unsubscribe();
34 251 : expired_batch_subscription_.unsubscribe();
35 251 : }
36 :
37 : PendingTransactionStorageImpl::SharedTxsCollectionType
38 : PendingTransactionStorageImpl::getPendingTransactions(
39 : const AccountIdType &account_id) const {
40 13 : std::shared_lock<std::shared_timed_mutex> lock(mutex_);
41 13 : auto creator_it = storage_.index.find(account_id);
42 13 : if (storage_.index.end() != creator_it) {
43 13 : auto &batch_hashes = creator_it->second;
44 13 : SharedTxsCollectionType result;
45 13 : auto &batches = storage_.batches;
46 26 : for (const auto &batch_hash : batch_hashes) {
47 13 : auto batch_it = batches.find(batch_hash);
48 13 : if (batches.end() != batch_it) {
49 13 : auto &txs = batch_it->second->transactions();
50 13 : result.insert(result.end(), txs.begin(), txs.end());
51 13 : }
52 : }
53 13 : return result;
54 13 : }
55 0 : return {};
56 13 : }
57 :
58 : std::set<PendingTransactionStorageImpl::AccountIdType>
59 : PendingTransactionStorageImpl::batchCreators(const TransactionBatch &batch) {
60 18 : std::set<AccountIdType> creators;
61 41 : for (const auto &transaction : batch.transactions()) {
62 23 : creators.insert(transaction->creatorAccountId());
63 : }
64 18 : return creators;
65 18 : }
66 :
67 : void PendingTransactionStorageImpl::updatedBatchesHandler(
68 : const SharedState &updated_batches) {
69 16 : std::unique_lock<std::shared_timed_mutex> lock(mutex_);
70 34 : for (auto &batch : updated_batches->getBatches()) {
71 18 : auto hash = batch->reducedHash();
72 18 : auto it = storage_.batches.find(hash);
73 18 : if (storage_.batches.end() == it) {
74 31 : for (const auto &creator : batchCreators(*batch)) {
75 17 : storage_.index[creator].insert(hash);
76 : }
77 14 : }
78 18 : storage_.batches[hash] = batch;
79 18 : }
80 16 : }
81 :
82 : void PendingTransactionStorageImpl::removeBatch(const SharedBatch &batch) {
83 4 : auto creators = batchCreators(*batch);
84 4 : auto hash = batch->reducedHash();
85 4 : std::unique_lock<std::shared_timed_mutex> lock(mutex_);
86 4 : auto &batches = storage_.batches;
87 4 : auto batch_it = batches.find(hash);
88 4 : if (batches.end() != batch_it) {
89 4 : batches.erase(batch_it);
90 4 : }
91 8 : for (const auto &creator : creators) {
92 : auto &index = storage_.index;
93 4 : auto index_it = index.find(creator);
94 4 : if (index.end() != index_it) {
95 4 : auto &creator_set = index_it->second;
96 4 : auto creator_it = creator_set.find(hash);
97 4 : if (creator_set.end() != creator_it) {
98 4 : creator_set.erase(creator_it);
99 4 : }
100 4 : }
101 : }
102 : }
103 :
104 : } // namespace iroha
|