LCOV - code coverage report
Current view: top level - libs/common - timeout.hpp (source / functions) Hit Total Coverage
Test: coverage_cleared.info Lines: 60 76 78.9 %
Date: 2018-12-05 17:11:35 Functions: 79 88 89.8 %

          Line data    Source code
       1             : /**
       2             :  * Copyright Soramitsu Co., Ltd. All Rights Reserved.
       3             :  * Copyright (c) Microsoft Open Technologies, Inc. All rights reserved.
       4             :  * SPDX-License-Identifier: Apache-2.0
       5             :  */
       6             : 
       7             : #ifndef IROHA_TIMEOUT_HPP
       8             : #define IROHA_TIMEOUT_HPP
       9             : 
      10             : #include <rxcpp/operators/rx-timeout.hpp>
      11             : 
      12             : namespace iroha {
      13             : 
      14             :   /**
      15             :    * This class is mostly the same as rxcpp::operators::timeout,
      16             :    * the only change is that it accepts a selector lambda which generates
      17             :    * a duration based on observable value instead of a fixed duration
      18             :    * Return an observable that terminates with timeout_error if a particular
      19             :    * timespan has passed without emitting another item from the source
      20             :    * observable
      21             :    * Timespan is generated with selector from the last received value
      22             :    * @tparam T value type
      23             :    * @tparam Selector the type of the transforming function
      24             :    * which returns time interval
      25             :    * @tparam Coordination the type of the scheduler
      26             :    */
      27             :   template <class T, class Selector, class Coordination>
      28             :   struct timeout {
      29             :     typedef rxcpp::util::decay_t<T> source_value_type;
      30             :     typedef rxcpp::util::decay_t<Coordination> coordination_type;
      31             :     typedef typename coordination_type::coordinator_type coordinator_type;
      32             :     typedef rxcpp::util::decay_t<Selector> select_type;
      33             : 
      34             :     struct timeout_values {
      35             :       timeout_values(select_type s, coordination_type c)
      36           2 :           : selector(std::move(s)), coordination(c) {}
      37             : 
      38             :       select_type selector;
      39             :       coordination_type coordination;
      40             :     };
      41             :     timeout_values initial;
      42             : 
      43             :     timeout(select_type s, coordination_type coordination)
      44           2 :         : initial(std::move(s), coordination) {}
      45             : 
      46             :     template <class Subscriber>
      47             :     struct timeout_observer {
      48             :       typedef timeout_observer<Subscriber> this_type;
      49             :       typedef rxcpp::util::decay_t<T> value_type;
      50             :       typedef rxcpp::util::decay_t<Subscriber> dest_type;
      51             :       typedef rxcpp::observer<T, this_type> observer_type;
      52             : 
      53             :       struct timeout_subscriber_values : public timeout_values {
      54             :         timeout_subscriber_values(rxcpp::composite_subscription cs,
      55             :                                   dest_type d,
      56             :                                   timeout_values v,
      57             :                                   coordinator_type c)
      58           2 :             : timeout_values(v),
      59           2 :               cs(std::move(cs)),
      60           2 :               dest(std::move(d)),
      61           2 :               coordinator(std::move(c)),
      62           2 :               worker(coordinator.get_worker()),
      63           2 :               index(0) {}
      64             : 
      65             :         rxcpp::composite_subscription cs;
      66             :         dest_type dest;
      67             :         coordinator_type coordinator;
      68             :         rxcpp::schedulers::worker worker;
      69             :         mutable std::size_t index;
      70             :       };
      71             :       typedef std::shared_ptr<timeout_subscriber_values> state_type;
      72             :       state_type state;
      73             : 
      74             :       timeout_observer(rxcpp::composite_subscription cs,
      75             :                        dest_type d,
      76             :                        timeout_values v,
      77             :                        coordinator_type c)
      78           2 :           : state(std::make_shared<timeout_subscriber_values>(
      79           2 :                 timeout_subscriber_values(
      80           2 :                     std::move(cs), std::move(d), v, std::move(c)))) {
      81           2 :         auto localState = state;
      82             : 
      83             :         auto disposer = [=](const rxcpp::schedulers::schedulable &) {
      84           2 :           localState->cs.unsubscribe();
      85           2 :           localState->dest.unsubscribe();
      86           2 :           localState->worker.unsubscribe();
      87           2 :         };
      88           2 :         auto selectedDisposer = on_exception(
      89             :             [&]() { return localState->coordinator.act(disposer); },
      90           2 :             localState->dest);
      91           2 :         if (selectedDisposer.empty()) {
      92           0 :           return;
      93             :         }
      94             : 
      95           2 :         localState->dest.add(
      96             :             [=]() { localState->worker.schedule(selectedDisposer.get()); });
      97           2 :         localState->cs.add(
      98             :             [=]() { localState->worker.schedule(selectedDisposer.get()); });
      99           2 :       }
     100             : 
     101             :       static std::function<void(const rxcpp::schedulers::schedulable &)>
     102             :       produce_timeout(std::size_t id, state_type state) {
     103             :         auto produce = [id, state](const rxcpp::schedulers::schedulable &) {
     104           1 :           if (id != state->index)
     105           0 :             return;
     106             : 
     107           1 :           state->dest.on_error(std::make_exception_ptr(
     108           1 :               rxcpp::timeout_error("timeout has occurred")));
     109           1 :         };
     110             : 
     111           3 :         auto selectedProduce = on_exception(
     112             :             [&]() { return state->coordinator.act(produce); }, state->dest);
     113           3 :         if (selectedProduce.empty()) {
     114           0 :           return std::function<void(const rxcpp::schedulers::schedulable &)>();
     115             :         }
     116             : 
     117           3 :         return std::function<void(const rxcpp::schedulers::schedulable &)>(
     118           3 :             selectedProduce.get());
     119           3 :       }
     120             : 
     121             :       template <class Value>
     122             :       void on_next(Value &&v) const {
     123           3 :         auto localState = state;
     124             : 
     125           3 :         auto selected = on_exception(
     126             :             [&]() { return localState->selector(std::forward<Value>(v)); },
     127           3 :             localState->dest);
     128           3 :         if (selected.empty()) {
     129           0 :           return;
     130             :         }
     131             : 
     132             :         auto work = [v, localState, period = std::move(selected.get())](
     133             :                         const rxcpp::schedulers::schedulable &) {
     134           3 :           auto new_id = ++localState->index;
     135           3 :           auto produce_time = localState->worker.now() + period;
     136             : 
     137           3 :           localState->dest.on_next(v);
     138           3 :           localState->worker.schedule(produce_time,
     139           3 :                                       produce_timeout(new_id, localState));
     140           3 :         };
     141             :         auto selectedWork =
     142             :             on_exception([&]() { return localState->coordinator.act(work); },
     143           3 :                          localState->dest);
     144           3 :         if (selectedWork.empty()) {
     145           0 :           return;
     146             :         }
     147           3 :         localState->worker.schedule(selectedWork.get());
     148           3 :       }
     149             : 
     150             :       void on_error(std::exception_ptr e) const {
     151           0 :         auto localState = state;
     152             :         auto work = [e, localState](const rxcpp::schedulers::schedulable &) {
     153           0 :           localState->dest.on_error(e);
     154           0 :         };
     155             :         auto selectedWork =
     156             :             on_exception([&]() { return localState->coordinator.act(work); },
     157           0 :                          localState->dest);
     158           0 :         if (selectedWork.empty()) {
     159           0 :           return;
     160             :         }
     161           0 :         localState->worker.schedule(selectedWork.get());
     162           0 :       }
     163             : 
     164             :       void on_completed() const {
     165           1 :         auto localState = state;
     166             :         auto work = [localState](const rxcpp::schedulers::schedulable &) {
     167           1 :           localState->dest.on_completed();
     168           1 :         };
     169             :         auto selectedWork =
     170             :             on_exception([&]() { return localState->coordinator.act(work); },
     171           1 :                          localState->dest);
     172           1 :         if (selectedWork.empty()) {
     173           0 :           return;
     174             :         }
     175           1 :         localState->worker.schedule(selectedWork.get());
     176           1 :       }
     177             : 
     178             :       static rxcpp::subscriber<T, observer_type> make(dest_type d,
     179             :                                                       timeout_values v) {
     180           2 :         auto cs = rxcpp::composite_subscription();
     181           2 :         auto coordinator = v.coordination.create_coordinator();
     182             : 
     183           2 :         return rxcpp::make_subscriber<T>(
     184             :             cs,
     185           2 :             observer_type(this_type(
     186           2 :                 cs, std::move(d), std::move(v), std::move(coordinator))));
     187           2 :       }
     188             :     };
     189             : 
     190             :     template <class Subscriber>
     191             :     auto operator()(Subscriber dest) const
     192             :         -> decltype(timeout_observer<Subscriber>::make(std::move(dest),
     193             :                                                        initial)) {
     194           2 :       return timeout_observer<Subscriber>::make(std::move(dest), initial);
     195           0 :     }
     196             :   };
     197             : 
     198             :   template <
     199             :       typename T,
     200             :       typename Selector,
     201             :       typename Coordination,
     202             :       class ResolvedSelector = rxcpp::util::decay_t<Selector>,
     203             :       class Duration = decltype(
     204             :           std::declval<ResolvedSelector>()((std::declval<std::decay_t<T>>()))),
     205             :       class Enabled = rxcpp::util::enable_if_all_true_type_t<
     206             :           rxcpp::is_coordination<Coordination>,
     207             :           rxcpp::util::is_duration<Duration>>,
     208             :       class Timeout =
     209             :           timeout<T, ResolvedSelector, rxcpp::util::decay_t<Coordination>>>
     210             :   static auto makeTimeout(Selector &&s, Coordination &&cn) {
     211           2 :     return Timeout(std::forward<Selector>(s), std::forward<Coordination>(cn));
     212           0 :   };
     213             : 
     214             : }  // namespace iroha
     215             : 
     216             : #endif  // IROHA_TIMEOUT_HPP

Generated by: LCOV version 1.13