Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * Copyright (c) Microsoft Open Technologies, Inc. All rights reserved.
4 : * SPDX-License-Identifier: Apache-2.0
5 : */
6 :
7 : #ifndef IROHA_TIMEOUT_HPP
8 : #define IROHA_TIMEOUT_HPP
9 :
10 : #include <rxcpp/operators/rx-timeout.hpp>
11 :
12 : namespace iroha {
13 :
14 : /**
15 : * This class is mostly the same as rxcpp::operators::timeout,
16 : * the only change is that it accepts a selector lambda which generates
17 : * a duration based on observable value instead of a fixed duration
18 : * Return an observable that terminates with timeout_error if a particular
19 : * timespan has passed without emitting another item from the source
20 : * observable
21 : * Timespan is generated with selector from the last received value
22 : * @tparam T value type
23 : * @tparam Selector the type of the transforming function
24 : * which returns time interval
25 : * @tparam Coordination the type of the scheduler
26 : */
27 : template <class T, class Selector, class Coordination>
28 : struct timeout {
29 : typedef rxcpp::util::decay_t<T> source_value_type;
30 : typedef rxcpp::util::decay_t<Coordination> coordination_type;
31 : typedef typename coordination_type::coordinator_type coordinator_type;
32 : typedef rxcpp::util::decay_t<Selector> select_type;
33 :
34 : struct timeout_values {
35 : timeout_values(select_type s, coordination_type c)
36 2 : : selector(std::move(s)), coordination(c) {}
37 :
38 : select_type selector;
39 : coordination_type coordination;
40 : };
41 : timeout_values initial;
42 :
43 : timeout(select_type s, coordination_type coordination)
44 2 : : initial(std::move(s), coordination) {}
45 :
46 : template <class Subscriber>
47 : struct timeout_observer {
48 : typedef timeout_observer<Subscriber> this_type;
49 : typedef rxcpp::util::decay_t<T> value_type;
50 : typedef rxcpp::util::decay_t<Subscriber> dest_type;
51 : typedef rxcpp::observer<T, this_type> observer_type;
52 :
53 : struct timeout_subscriber_values : public timeout_values {
54 : timeout_subscriber_values(rxcpp::composite_subscription cs,
55 : dest_type d,
56 : timeout_values v,
57 : coordinator_type c)
58 2 : : timeout_values(v),
59 2 : cs(std::move(cs)),
60 2 : dest(std::move(d)),
61 2 : coordinator(std::move(c)),
62 2 : worker(coordinator.get_worker()),
63 2 : index(0) {}
64 :
65 : rxcpp::composite_subscription cs;
66 : dest_type dest;
67 : coordinator_type coordinator;
68 : rxcpp::schedulers::worker worker;
69 : mutable std::size_t index;
70 : };
71 : typedef std::shared_ptr<timeout_subscriber_values> state_type;
72 : state_type state;
73 :
74 : timeout_observer(rxcpp::composite_subscription cs,
75 : dest_type d,
76 : timeout_values v,
77 : coordinator_type c)
78 2 : : state(std::make_shared<timeout_subscriber_values>(
79 2 : timeout_subscriber_values(
80 2 : std::move(cs), std::move(d), v, std::move(c)))) {
81 2 : auto localState = state;
82 :
83 : auto disposer = [=](const rxcpp::schedulers::schedulable &) {
84 2 : localState->cs.unsubscribe();
85 2 : localState->dest.unsubscribe();
86 2 : localState->worker.unsubscribe();
87 2 : };
88 2 : auto selectedDisposer = on_exception(
89 : [&]() { return localState->coordinator.act(disposer); },
90 2 : localState->dest);
91 2 : if (selectedDisposer.empty()) {
92 0 : return;
93 : }
94 :
95 2 : localState->dest.add(
96 : [=]() { localState->worker.schedule(selectedDisposer.get()); });
97 2 : localState->cs.add(
98 : [=]() { localState->worker.schedule(selectedDisposer.get()); });
99 2 : }
100 :
101 : static std::function<void(const rxcpp::schedulers::schedulable &)>
102 : produce_timeout(std::size_t id, state_type state) {
103 : auto produce = [id, state](const rxcpp::schedulers::schedulable &) {
104 1 : if (id != state->index)
105 0 : return;
106 :
107 1 : state->dest.on_error(std::make_exception_ptr(
108 1 : rxcpp::timeout_error("timeout has occurred")));
109 1 : };
110 :
111 3 : auto selectedProduce = on_exception(
112 : [&]() { return state->coordinator.act(produce); }, state->dest);
113 3 : if (selectedProduce.empty()) {
114 0 : return std::function<void(const rxcpp::schedulers::schedulable &)>();
115 : }
116 :
117 3 : return std::function<void(const rxcpp::schedulers::schedulable &)>(
118 3 : selectedProduce.get());
119 3 : }
120 :
121 : template <class Value>
122 : void on_next(Value &&v) const {
123 3 : auto localState = state;
124 :
125 3 : auto selected = on_exception(
126 : [&]() { return localState->selector(std::forward<Value>(v)); },
127 3 : localState->dest);
128 3 : if (selected.empty()) {
129 0 : return;
130 : }
131 :
132 : auto work = [v, localState, period = std::move(selected.get())](
133 : const rxcpp::schedulers::schedulable &) {
134 3 : auto new_id = ++localState->index;
135 3 : auto produce_time = localState->worker.now() + period;
136 :
137 3 : localState->dest.on_next(v);
138 3 : localState->worker.schedule(produce_time,
139 3 : produce_timeout(new_id, localState));
140 3 : };
141 : auto selectedWork =
142 : on_exception([&]() { return localState->coordinator.act(work); },
143 3 : localState->dest);
144 3 : if (selectedWork.empty()) {
145 0 : return;
146 : }
147 3 : localState->worker.schedule(selectedWork.get());
148 3 : }
149 :
150 : void on_error(std::exception_ptr e) const {
151 0 : auto localState = state;
152 : auto work = [e, localState](const rxcpp::schedulers::schedulable &) {
153 0 : localState->dest.on_error(e);
154 0 : };
155 : auto selectedWork =
156 : on_exception([&]() { return localState->coordinator.act(work); },
157 0 : localState->dest);
158 0 : if (selectedWork.empty()) {
159 0 : return;
160 : }
161 0 : localState->worker.schedule(selectedWork.get());
162 0 : }
163 :
164 : void on_completed() const {
165 1 : auto localState = state;
166 : auto work = [localState](const rxcpp::schedulers::schedulable &) {
167 1 : localState->dest.on_completed();
168 1 : };
169 : auto selectedWork =
170 : on_exception([&]() { return localState->coordinator.act(work); },
171 1 : localState->dest);
172 1 : if (selectedWork.empty()) {
173 0 : return;
174 : }
175 1 : localState->worker.schedule(selectedWork.get());
176 1 : }
177 :
178 : static rxcpp::subscriber<T, observer_type> make(dest_type d,
179 : timeout_values v) {
180 2 : auto cs = rxcpp::composite_subscription();
181 2 : auto coordinator = v.coordination.create_coordinator();
182 :
183 2 : return rxcpp::make_subscriber<T>(
184 : cs,
185 2 : observer_type(this_type(
186 2 : cs, std::move(d), std::move(v), std::move(coordinator))));
187 2 : }
188 : };
189 :
190 : template <class Subscriber>
191 : auto operator()(Subscriber dest) const
192 : -> decltype(timeout_observer<Subscriber>::make(std::move(dest),
193 : initial)) {
194 2 : return timeout_observer<Subscriber>::make(std::move(dest), initial);
195 0 : }
196 : };
197 :
198 : template <
199 : typename T,
200 : typename Selector,
201 : typename Coordination,
202 : class ResolvedSelector = rxcpp::util::decay_t<Selector>,
203 : class Duration = decltype(
204 : std::declval<ResolvedSelector>()((std::declval<std::decay_t<T>>()))),
205 : class Enabled = rxcpp::util::enable_if_all_true_type_t<
206 : rxcpp::is_coordination<Coordination>,
207 : rxcpp::util::is_duration<Duration>>,
208 : class Timeout =
209 : timeout<T, ResolvedSelector, rxcpp::util::decay_t<Coordination>>>
210 : static auto makeTimeout(Selector &&s, Coordination &&cn) {
211 2 : return Timeout(std::forward<Selector>(s), std::forward<Coordination>(cn));
212 0 : };
213 :
214 : } // namespace iroha
215 :
216 : #endif // IROHA_TIMEOUT_HPP
|