Line data Source code
1 : /**
2 : * Copyright Soramitsu Co., Ltd. All Rights Reserved.
3 : * SPDX-License-Identifier: Apache-2.0
4 : */
5 :
6 : #include "main/application.hpp"
7 :
8 : #include "ametsuchi/impl/postgres_ordering_service_persistent_state.hpp"
9 : #include "ametsuchi/impl/tx_presence_cache_impl.hpp"
10 : #include "ametsuchi/impl/wsv_restorer_impl.hpp"
11 : #include "backend/protobuf/common_objects/proto_common_objects_factory.hpp"
12 : #include "backend/protobuf/proto_block_json_converter.hpp"
13 : #include "backend/protobuf/proto_permission_to_string.hpp"
14 : #include "backend/protobuf/proto_proposal_factory.hpp"
15 : #include "backend/protobuf/proto_query_response_factory.hpp"
16 : #include "backend/protobuf/proto_transport_factory.hpp"
17 : #include "backend/protobuf/proto_tx_status_factory.hpp"
18 : #include "common/bind.hpp"
19 : #include "consensus/yac/impl/supermajority_checker_impl.hpp"
20 : #include "interfaces/iroha_internal/transaction_batch_factory_impl.hpp"
21 : #include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp"
22 : #include "interfaces/permission_to_string.hpp"
23 : #include "multi_sig_transactions/gossip_propagation_strategy.hpp"
24 : #include "multi_sig_transactions/mst_processor_impl.hpp"
25 : #include "multi_sig_transactions/mst_propagation_strategy_stub.hpp"
26 : #include "multi_sig_transactions/mst_time_provider_impl.hpp"
27 : #include "multi_sig_transactions/storage/mst_storage_impl.hpp"
28 : #include "multi_sig_transactions/transport/mst_transport_grpc.hpp"
29 : #include "multi_sig_transactions/transport/mst_transport_stub.hpp"
30 : #include "torii/impl/command_service_impl.hpp"
31 : #include "torii/impl/status_bus_impl.hpp"
32 : #include "validators/field_validator.hpp"
33 : #include "validators/protobuf/proto_query_validator.hpp"
34 : #include "validators/protobuf/proto_transaction_validator.hpp"
35 :
36 : using namespace iroha;
37 : using namespace iroha::ametsuchi;
38 : using namespace iroha::simulator;
39 : using namespace iroha::validation;
40 : using namespace iroha::network;
41 : using namespace iroha::synchronizer;
42 : using namespace iroha::torii;
43 : using namespace iroha::consensus::yac;
44 :
45 : using namespace std::chrono_literals;
46 :
47 : /**
48 : * Configuring iroha daemon
49 : */
50 : Irohad::Irohad(const std::string &block_store_dir,
51 : const std::string &pg_conn,
52 : const std::string &listen_ip,
53 : size_t torii_port,
54 : size_t internal_port,
55 : size_t max_proposal_size,
56 : std::chrono::milliseconds proposal_delay,
57 : std::chrono::milliseconds vote_delay,
58 : const shared_model::crypto::Keypair &keypair,
59 : const boost::optional<GossipPropagationStrategyParams>
60 : &opt_mst_gossip_params)
61 0 : : block_store_dir_(block_store_dir),
62 0 : pg_conn_(pg_conn),
63 0 : listen_ip_(listen_ip),
64 0 : torii_port_(torii_port),
65 0 : internal_port_(internal_port),
66 0 : max_proposal_size_(max_proposal_size),
67 0 : proposal_delay_(proposal_delay),
68 0 : vote_delay_(vote_delay),
69 0 : is_mst_supported_(opt_mst_gossip_params),
70 0 : opt_mst_gossip_params_(opt_mst_gossip_params),
71 0 : keypair(keypair) {
72 0 : log_ = logger::log("IROHAD");
73 0 : log_->info("created");
74 : // Initializing storage at this point in order to insert genesis block before
75 : // initialization of iroha daemon
76 0 : initStorage();
77 0 : }
78 :
79 : /**
80 : * Initializing iroha daemon
81 : */
82 : void Irohad::init() {
83 : // Recover WSV from the existing ledger to be sure it is consistent
84 0 : initWsvRestorer();
85 0 : restoreWsv();
86 :
87 0 : initCryptoProvider();
88 0 : initBatchParser();
89 0 : initValidators();
90 0 : initNetworkClient();
91 0 : initFactories();
92 : initOrderingGate();
93 0 : initSimulator();
94 0 : initConsensusCache();
95 0 : initBlockLoader();
96 0 : initPersistentCache();
97 0 : initConsensusGate();
98 0 : initSynchronizer();
99 0 : initPeerCommunicationService();
100 0 : initStatusBus();
101 0 : initMstProcessor();
102 : initPendingTxsStorage();
103 :
104 : // Torii
105 0 : initTransactionCommandService();
106 0 : initQueryService();
107 0 : }
108 :
109 : /**
110 : * Dropping iroha daemon storage
111 : */
112 : void Irohad::dropStorage() {
113 0 : storage->reset();
114 0 : storage->createOsPersistentState() |
115 : [](const auto &state) { state->resetState(); };
116 0 : }
117 :
118 : /**
119 : * Initializing iroha daemon storage
120 : */
121 : void Irohad::initStorage() {
122 0 : common_objects_factory_ =
123 0 : std::make_shared<shared_model::proto::ProtoCommonObjectsFactory<
124 : shared_model::validation::FieldValidator>>();
125 : auto perm_converter =
126 0 : std::make_shared<shared_model::proto::ProtoPermissionToString>();
127 : auto block_converter =
128 0 : std::make_shared<shared_model::proto::ProtoBlockJsonConverter>();
129 0 : auto storageResult = StorageImpl::create(block_store_dir_,
130 0 : pg_conn_,
131 0 : common_objects_factory_,
132 0 : std::move(block_converter),
133 0 : perm_converter);
134 0 : storageResult.match(
135 : [&](expected::Value<std::shared_ptr<ametsuchi::StorageImpl>> &_storage) {
136 0 : storage = _storage.value;
137 0 : },
138 : [&](expected::Error<std::string> &error) { log_->error(error.error); });
139 :
140 0 : log_->info("[Init] => storage", logger::logBool(storage));
141 0 : }
142 :
143 : void Irohad::resetOrderingService() {
144 0 : if (not(storage->createOsPersistentState() |
145 : [](const auto &state) { return state->resetState(); }))
146 0 : log_->error("cannot reset ordering service storage");
147 0 : }
148 :
149 : bool Irohad::restoreWsv() {
150 0 : return wsv_restorer_->restoreWsv(*storage).match(
151 : [](iroha::expected::Value<void> v) { return true; },
152 : [&](iroha::expected::Error<std::string> &error) {
153 0 : log_->error(error.error);
154 0 : return false;
155 : });
156 0 : }
157 :
158 : /**
159 : * Initializing crypto provider
160 : */
161 : void Irohad::initCryptoProvider() {
162 0 : crypto_signer_ =
163 0 : std::make_shared<shared_model::crypto::CryptoModelSigner<>>(keypair);
164 :
165 0 : log_->info("[Init] => crypto provider");
166 0 : }
167 :
168 : void Irohad::initBatchParser() {
169 0 : batch_parser =
170 0 : std::make_shared<shared_model::interface::TransactionBatchParserImpl>();
171 :
172 0 : log_->info("[Init] => transaction batch parser");
173 0 : }
174 :
175 : /**
176 : * Initializing validators
177 : */
178 : void Irohad::initValidators() {
179 0 : auto factory = std::make_unique<shared_model::proto::ProtoProposalFactory<
180 : shared_model::validation::DefaultProposalValidator>>();
181 0 : stateful_validator =
182 0 : std::make_shared<StatefulValidatorImpl>(std::move(factory), batch_parser);
183 0 : chain_validator = std::make_shared<ChainValidatorImpl>(
184 0 : std::make_shared<consensus::yac::SupermajorityCheckerImpl>());
185 :
186 0 : log_->info("[Init] => validators");
187 0 : }
188 :
189 : /**
190 : * Initializing network client
191 : */
192 : void Irohad::initNetworkClient() {
193 0 : async_call_ =
194 0 : std::make_shared<network::AsyncGrpcClient<google::protobuf::Empty>>();
195 0 : }
196 :
197 : void Irohad::initFactories() {
198 : // transaction factories
199 0 : transaction_batch_factory_ =
200 0 : std::make_shared<shared_model::interface::TransactionBatchFactoryImpl>();
201 :
202 : std::unique_ptr<shared_model::validation::AbstractValidator<
203 : shared_model::interface::Transaction>>
204 0 : transaction_validator =
205 0 : std::make_unique<shared_model::validation::
206 : DefaultOptionalSignedTransactionValidator>();
207 : std::unique_ptr<
208 : shared_model::validation::AbstractValidator<iroha::protocol::Transaction>>
209 0 : proto_transaction_validator = std::make_unique<
210 : shared_model::validation::ProtoTransactionValidator>();
211 0 : transaction_factory =
212 0 : std::make_shared<shared_model::proto::ProtoTransportFactory<
213 : shared_model::interface::Transaction,
214 : shared_model::proto::Transaction>>(
215 0 : std::move(transaction_validator),
216 0 : std::move(proto_transaction_validator));
217 :
218 : // query factories
219 0 : query_response_factory_ =
220 0 : std::make_shared<shared_model::proto::ProtoQueryResponseFactory>();
221 :
222 : std::unique_ptr<shared_model::validation::AbstractValidator<
223 : shared_model::interface::Query>>
224 0 : query_validator = std::make_unique<
225 : shared_model::validation::DefaultSignedQueryValidator>();
226 : std::unique_ptr<
227 : shared_model::validation::AbstractValidator<iroha::protocol::Query>>
228 0 : proto_query_validator =
229 0 : std::make_unique<shared_model::validation::ProtoQueryValidator>();
230 0 : query_factory = std::make_shared<
231 : shared_model::proto::ProtoTransportFactory<shared_model::interface::Query,
232 : shared_model::proto::Query>>(
233 0 : std::move(query_validator), std::move(proto_query_validator));
234 :
235 0 : log_->info("[Init] => factories");
236 0 : }
237 :
238 : /**
239 : * Initializing ordering gate
240 : */
241 : void Irohad::initOrderingGate() {
242 0 : ordering_gate = ordering_init.initOrderingGate(storage,
243 0 : max_proposal_size_,
244 0 : proposal_delay_,
245 0 : storage,
246 0 : storage,
247 0 : transaction_batch_factory_,
248 0 : async_call_);
249 0 : log_->info("[Init] => init ordering gate - [{}]",
250 0 : logger::logBool(ordering_gate));
251 0 : }
252 :
253 : /**
254 : * Initializing iroha verified proposal creator and block creator
255 : */
256 : void Irohad::initSimulator() {
257 0 : auto block_factory = std::make_unique<shared_model::proto::ProtoBlockFactory>(
258 : // Block factory in simulator uses UnsignedBlockValidator because it is
259 : // not required to check signatures of block here, as they will be
260 : // checked when supermajority of peers will sign the block. It is also
261 : // not required to validate signatures of transactions here because they
262 : // are validated in the ordering gate, where they are received from the
263 : // ordering service.
264 0 : std::make_unique<
265 : shared_model::validation::DefaultUnsignedBlockValidator>());
266 0 : simulator = std::make_shared<Simulator>(ordering_gate,
267 0 : stateful_validator,
268 0 : storage,
269 0 : storage,
270 0 : crypto_signer_,
271 0 : std::move(block_factory));
272 :
273 0 : log_->info("[Init] => init simulator");
274 0 : }
275 :
276 : /**
277 : * Initializing consensus block cache
278 : */
279 : void Irohad::initConsensusCache() {
280 0 : consensus_result_cache_ = std::make_shared<consensus::ConsensusResultCache>();
281 :
282 0 : log_->info("[Init] => init consensus block cache");
283 0 : }
284 :
285 : /**
286 : * Initializing block loader
287 : */
288 : void Irohad::initBlockLoader() {
289 0 : block_loader =
290 0 : loader_init.initBlockLoader(storage, storage, consensus_result_cache_);
291 :
292 0 : log_->info("[Init] => block loader");
293 0 : }
294 :
295 : /**
296 : * Initializing persistent cache
297 : */
298 : void Irohad::initPersistentCache() {
299 0 : persistent_cache = std::make_shared<TxPresenceCacheImpl>(storage);
300 :
301 0 : log_->info("[Init] => persistent cache");
302 0 : }
303 :
304 : /**
305 : * Initializing consensus gate
306 : */
307 : void Irohad::initConsensusGate() {
308 0 : consensus_gate = yac_init.initConsensusGate(storage,
309 0 : simulator,
310 0 : block_loader,
311 0 : keypair,
312 0 : consensus_result_cache_,
313 0 : vote_delay_,
314 0 : async_call_,
315 0 : common_objects_factory_);
316 :
317 0 : log_->info("[Init] => consensus gate");
318 0 : }
319 :
320 : /**
321 : * Initializing synchronizer
322 : */
323 : void Irohad::initSynchronizer() {
324 0 : synchronizer = std::make_shared<SynchronizerImpl>(
325 0 : consensus_gate, chain_validator, storage, storage, block_loader);
326 :
327 0 : log_->info("[Init] => synchronizer");
328 0 : }
329 :
330 : /**
331 : * Initializing peer communication service
332 : */
333 : void Irohad::initPeerCommunicationService() {
334 0 : pcs = std::make_shared<PeerCommunicationServiceImpl>(
335 0 : ordering_gate, synchronizer, simulator);
336 :
337 0 : pcs->on_proposal().subscribe(
338 : [this](auto) { log_->info("~~~~~~~~~| PROPOSAL ^_^ |~~~~~~~~~ "); });
339 :
340 0 : pcs->on_commit().subscribe(
341 : [this](auto) { log_->info("~~~~~~~~~| COMMIT =^._.^= |~~~~~~~~~ "); });
342 :
343 : // complete initialization of ordering gate
344 0 : ordering_gate->setPcs(*pcs);
345 :
346 0 : log_->info("[Init] => pcs");
347 0 : }
348 :
349 : void Irohad::initStatusBus() {
350 0 : status_bus_ = std::make_shared<StatusBusImpl>();
351 0 : log_->info("[Init] => Tx status bus");
352 0 : }
353 :
354 : void Irohad::initMstProcessor() {
355 0 : auto mst_completer = std::make_shared<DefaultCompleter>();
356 0 : auto mst_storage = std::make_shared<MstStorageStateImpl>(mst_completer);
357 0 : std::shared_ptr<iroha::PropagationStrategy> mst_propagation;
358 0 : if (is_mst_supported_) {
359 0 : mst_transport = std::make_shared<iroha::network::MstTransportGrpc>(
360 0 : async_call_,
361 0 : transaction_factory,
362 0 : batch_parser,
363 0 : transaction_batch_factory_,
364 0 : persistent_cache,
365 0 : keypair.publicKey());
366 0 : mst_propagation = std::make_shared<GossipPropagationStrategy>(
367 0 : storage, rxcpp::observe_on_new_thread(), *opt_mst_gossip_params_);
368 0 : } else {
369 0 : mst_propagation = std::make_shared<iroha::PropagationStrategyStub>();
370 0 : mst_transport = std::make_shared<iroha::network::MstTransportStub>();
371 : }
372 :
373 0 : auto mst_time = std::make_shared<MstTimeProviderImpl>();
374 0 : auto fair_mst_processor = std::make_shared<FairMstProcessor>(
375 0 : mst_transport, mst_storage, mst_propagation, mst_time);
376 0 : mst_processor = fair_mst_processor;
377 0 : mst_transport->subscribe(fair_mst_processor);
378 0 : log_->info("[Init] => MST processor");
379 0 : }
380 :
381 : void Irohad::initPendingTxsStorage() {
382 0 : pending_txs_storage_ = std::make_shared<PendingTransactionStorageImpl>(
383 0 : mst_processor->onStateUpdate(),
384 0 : mst_processor->onPreparedBatches(),
385 0 : mst_processor->onExpiredBatches());
386 0 : log_->info("[Init] => pending transactions storage");
387 0 : }
388 :
389 : /**
390 : * Initializing transaction command service
391 : */
392 : void Irohad::initTransactionCommandService() {
393 : auto status_factory =
394 0 : std::make_shared<shared_model::proto::ProtoTxStatusFactory>();
395 0 : auto tx_processor = std::make_shared<TransactionProcessorImpl>(
396 0 : pcs, mst_processor, status_bus_, status_factory);
397 0 : command_service = std::make_shared<::torii::CommandServiceImpl>(
398 0 : tx_processor, storage, status_bus_, status_factory);
399 0 : command_service_transport =
400 0 : std::make_shared<::torii::CommandServiceTransportGrpc>(
401 0 : command_service,
402 0 : status_bus_,
403 0 : std::chrono::seconds(1),
404 0 : 2 * proposal_delay_,
405 : status_factory,
406 0 : transaction_factory,
407 0 : batch_parser,
408 0 : transaction_batch_factory_);
409 :
410 0 : log_->info("[Init] => command service");
411 0 : }
412 :
413 : /**
414 : * Initializing query command service
415 : */
416 : void Irohad::initQueryService() {
417 0 : auto query_processor = std::make_shared<QueryProcessorImpl>(
418 0 : storage, storage, pending_txs_storage_, query_response_factory_);
419 :
420 0 : query_service =
421 0 : std::make_shared<::torii::QueryService>(query_processor, query_factory);
422 :
423 0 : log_->info("[Init] => query service");
424 0 : }
425 :
426 : void Irohad::initWsvRestorer() {
427 0 : wsv_restorer_ = std::make_shared<iroha::ametsuchi::WsvRestorerImpl>();
428 0 : }
429 :
430 : /**
431 : * Run iroha daemon
432 : */
433 : Irohad::RunResult Irohad::run() {
434 : using iroha::expected::operator|;
435 :
436 : // Initializing torii server
437 0 : torii_server = std::make_unique<ServerRunner>(
438 0 : listen_ip_ + ":" + std::to_string(torii_port_), false);
439 :
440 : // Initializing internal server
441 0 : internal_server = std::make_unique<ServerRunner>(
442 0 : listen_ip_ + ":" + std::to_string(internal_port_), false);
443 :
444 : // Run torii server
445 0 : return (torii_server->append(command_service_transport)
446 0 : .append(query_service)
447 0 : .run()
448 0 : |
449 : [&](const auto &port) {
450 0 : log_->info("Torii server bound on port {}", port);
451 0 : if (is_mst_supported_) {
452 0 : internal_server->append(
453 0 : std::static_pointer_cast<MstTransportGrpc>(mst_transport));
454 0 : }
455 : // Run internal server
456 0 : return internal_server
457 0 : ->append(ordering_init.ordering_gate_transport)
458 0 : .append(ordering_init.ordering_service_transport)
459 0 : .append(yac_init.consensus_network)
460 0 : .append(loader_init.service)
461 0 : .run();
462 0 : })
463 0 : .match(
464 : [&](const auto &port) -> RunResult {
465 0 : log_->info("Internal server bound on port {}", port.value);
466 0 : log_->info("===> iroha initialized");
467 0 : return {};
468 : },
469 : [&](const expected::Error<std::string> &e) -> RunResult {
470 0 : log_->error(e.error);
471 0 : return e;
472 : });
473 0 : }
474 :
475 : Irohad::~Irohad() {
476 : // TODO andrei 17.09.18: IR-1710 Verify that all components' destructors are
477 : // called in irohad destructor
478 0 : storage->freeConnections();
479 0 : }
|