LCOV - code coverage report
Current view: top level - irohad/ordering/impl - single_peer_ordering_service.cpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 66 71 93.0 %
Date: 2018-12-05 17:11:35 Functions: 25 27 92.6 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * SPDX-License-Identifier: Apache-2.0
       4             :  */
       5             : 
       6             : #include "ordering/impl/single_peer_ordering_service.hpp"
       7             : 
       8             : #include <algorithm>
       9             : #include <iterator>
      10             : 
      11             : #include <boost/range/adaptor/indirected.hpp>
      12             : 
      13             : #include "ametsuchi/ordering_service_persistent_state.hpp"
      14             : #include "common/bind.hpp"
      15             : #include "datetime/time.hpp"
      16             : #include "interfaces/common_objects/peer.hpp"
      17             : #include "interfaces/iroha_internal/transaction_batch_impl.hpp"
      18             : #include "network/ordering_service_transport.hpp"
      19             : 
      20             : namespace iroha {
      21             :   namespace ordering {
      22             :     SinglePeerOrderingService::SinglePeerOrderingService(
      23             :         std::shared_ptr<ametsuchi::PeerQueryFactory> peer_query_factory,
      24             :         size_t max_size,
      25             :         rxcpp::observable<TimeoutType> proposal_timeout,
      26             :         std::shared_ptr<network::OrderingServiceTransport> transport,
      27             :         std::shared_ptr<ametsuchi::OsPersistentStateFactory> persistent_state,
      28             :         std::unique_ptr<shared_model::interface::ProposalFactory> factory,
      29             :         bool is_async)
      30         251 :         : peer_query_factory_(peer_query_factory),
      31         251 :           max_size_(max_size),
      32         251 :           current_size_(0),
      33         251 :           transport_(transport),
      34         251 :           persistent_state_(persistent_state),
      35         251 :           factory_(std::move(factory)),
      36         251 :           proposal_height_(persistent_state_->createOsPersistentState() |
      37             :                            [](const auto &state) {
      38         251 :                              return state->loadProposalHeight().value();
      39             :                            }),
      40         251 :           log_(logger::log("OrderingServiceImpl")) {
      41             :       // restore state of ordering service from persistent storage
      42         251 :       rxcpp::observable<ProposalEvent> timer =
      43             :           proposal_timeout.map([](auto) { return ProposalEvent::kTimerEvent; });
      44             : 
      45             :       auto subscribe = [&](auto merge_strategy) {
      46         245 :         handle_ = merge_strategy(rxcpp::observable<>::from(
      47         245 :                                      timer, transactions_.get_observable()))
      48             :                       .subscribe([this](auto &&v) {
      49             :                         auto check_queue = [&] {
      50        2027 :                           switch (v) {
      51             :                             case ProposalEvent::kTimerEvent:
      52         245 :                               return not queue_.empty();
      53             :                             case ProposalEvent::kBatchEvent:
      54        2023 :                               return current_size_.load() >= max_size_;
      55             :                             default:
      56           0 :                               BOOST_ASSERT_MSG(false, "Unknown value");
      57             :                           }
      58        2027 :                         };
      59        2027 :                         if (check_queue()) {
      60        2006 :                           this->generateProposal();
      61        2006 :                         }
      62        2027 :                       });
      63         245 :       };
      64             : 
      65         251 :       if (is_async) {
      66             :         subscribe([](auto observable) {
      67         245 :           return observable.merge(rxcpp::synchronize_new_thread());
      68           0 :         });
      69         245 :       } else {
      70             :         subscribe([](auto observable) { return observable.merge(); });
      71             :       }
      72         251 :     }
      73             : 
      74             :     void SinglePeerOrderingService::onBatch(
      75             :         std::unique_ptr<shared_model::interface::TransactionBatch> batch) {
      76        2731 :       std::shared_lock<std::shared_timed_mutex> batch_prop_lock(
      77        2731 :           batch_prop_mutex_);
      78             : 
      79        2734 :       current_size_.fetch_add(batch->transactions().size());
      80        2734 :       queue_.push(std::move(batch));
      81        2697 :       log_->info("Queue size is {}", current_size_.load());
      82             : 
      83        2740 :       batch_prop_lock.unlock();
      84             : 
      85        2738 :       std::lock_guard<std::mutex> event_lock(event_mutex_);
      86        2741 :       transactions_.get_subscriber().on_next(ProposalEvent::kBatchEvent);
      87        2741 :     }
      88             : 
      89             :     void SinglePeerOrderingService::generateProposal() {
      90        2716 :       std::lock_guard<std::shared_timed_mutex> lock(batch_prop_mutex_);
      91        2716 :       log_->info("Start proposal generation");
      92             :       std::vector<std::shared_ptr<shared_model::interface::Transaction>> txs;
      93        5457 :       for (std::unique_ptr<shared_model::interface::TransactionBatch> batch;
      94        5457 :            txs.size() < max_size_ and queue_.try_pop(batch);) {
      95        2741 :         auto batch_size = batch->transactions().size();
      96             :         // TODO 29.08.2018 andrei IR-1667 Timestamp validation during proposal
      97             :         // generation
      98        2741 :         txs.insert(std::end(txs),
      99        2741 :                    std::make_move_iterator(std::begin(batch->transactions())),
     100        2741 :                    std::make_move_iterator(std::end(batch->transactions())));
     101        2741 :         current_size_ -= batch_size;
     102             :       }
     103             : 
     104        2716 :       auto tx_range = txs | boost::adaptors::indirected;
     105        2716 :       auto proposal = factory_->createProposal(
     106        2716 :           proposal_height_, iroha::time::now(), tx_range);
     107             : 
     108        2716 :       proposal.match(
     109             :           [this](expected::Value<
     110             :                  std::unique_ptr<shared_model::interface::Proposal>> &v) {
     111             :             // Save proposal height to the persistent storage.
     112             :             // In case of restart it reloads state.
     113        2716 :             if (persistent_state_->createOsPersistentState() |
     114             :                 [this](const auto &state) {
     115        2715 :                   return state->saveProposalHeight(proposal_height_ + 1);
     116             :                 }) {
     117         714 :               publishProposal(std::move(v.value));
     118         714 :               proposal_height_++;
     119         714 :             } else {
     120             :               // TODO(@l4l) 23/03/18: publish proposal independent of psql
     121             :               // status IR-1162
     122        2002 :               log_->warn(
     123             :                   "Proposal height cannot be saved. Skipping proposal publish");
     124             :             }
     125        2716 :           },
     126             :           [this](expected::Error<std::string> &e) {
     127           0 :             log_->warn("Failed to initialize proposal: {}", e.error);
     128           0 :           });
     129        2716 :     }
     130             : 
     131             :     void SinglePeerOrderingService::publishProposal(
     132             :         std::unique_ptr<shared_model::interface::Proposal> proposal) {
     133         714 :       auto peers = peer_query_factory_->createPeerQuery() |
     134             :           [](const auto &query) { return query->getLedgerPeers(); };
     135         714 :       if (peers) {
     136         714 :         std::vector<std::string> addresses;
     137         714 :         std::transform(peers->begin(),
     138         714 :                        peers->end(),
     139         714 :                        std::back_inserter(addresses),
     140             :                        [](auto &p) { return p->address(); });
     141         714 :         transport_->publishProposal(std::move(proposal), addresses);
     142         714 :       } else {
     143           0 :         log_->error("Cannot get the peer list");
     144             :       }
     145         714 :     }
     146             : 
     147             :     SinglePeerOrderingService::~SinglePeerOrderingService() {
     148         251 :       handle_.unsubscribe();
     149         251 :     }
     150             :   }  // namespace ordering
     151             : }  // namespace iroha

Generated by: LCOV version 1.13