Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2016-2021 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU Affero General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
12 :
13 : You should have received a copy of the GNU Affero General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file taler-exchange-transfer.c
18 : * @brief Process that actually finalizes outgoing transfers with the wire gateway / bank
19 : * @author Christian Grothoff
20 : */
21 : #include "taler/platform.h"
22 : #include <gnunet/gnunet_util_lib.h>
23 : #include <jansson.h>
24 : #include <pthread.h>
25 : #include "taler/taler_exchangedb_lib.h"
26 : #include "taler/taler_exchangedb_plugin.h"
27 : #include "taler/taler_json_lib.h"
28 : #include "taler/taler_bank_service.h"
29 :
30 : /**
31 : * What is the default batch size we use for credit history
32 : * requests with the bank. See `batch_size` below.
33 : */
34 : #define DEFAULT_BATCH_SIZE 32
35 :
36 : /**
37 : * How often will we retry a request (given certain
38 : * HTTP status codes) before giving up?
39 : */
40 : #define MAX_RETRIES 3
41 :
42 : /**
43 : * Information about our work shard.
44 : */
45 : struct Shard
46 : {
47 :
48 : /**
49 : * Time when we started to work on this shard.
50 : */
51 : struct GNUNET_TIME_Absolute shard_start_time;
52 :
53 : /**
54 : * Offset the shard begins at.
55 : */
56 : uint64_t shard_start;
57 :
58 : /**
59 : * Exclusive offset where the shard ends.
60 : */
61 : uint64_t shard_end;
62 :
63 : /**
64 : * Offset where our current batch begins.
65 : */
66 : uint64_t batch_start;
67 :
68 : /**
69 : * Highest row processed in the current batch.
70 : */
71 : uint64_t batch_end;
72 :
73 : };
74 :
75 :
76 : /**
77 : * Data we keep to #run_transfers(). There is at most
78 : * one of these around at any given point in time.
79 : * Note that this limits parallelism, and we might want
80 : * to revise this decision at a later point.
81 : */
82 : struct WirePrepareData
83 : {
84 :
85 : /**
86 : * All transfers done in the same transaction
87 : * are kept in a DLL.
88 : */
89 : struct WirePrepareData *next;
90 :
91 : /**
92 : * All transfers done in the same transaction
93 : * are kept in a DLL.
94 : */
95 : struct WirePrepareData *prev;
96 :
97 : /**
98 : * Wire execution handle.
99 : */
100 : struct TALER_BANK_TransferHandle *eh;
101 :
102 : /**
103 : * Wire account used for this preparation.
104 : */
105 : const struct TALER_EXCHANGEDB_AccountInfo *wa;
106 :
107 : /**
108 : * Row ID of the transfer.
109 : */
110 : unsigned long long row_id;
111 :
112 : /**
113 : * Number of bytes allocated after this struct
114 : * with the prewire data.
115 : */
116 : size_t buf_size;
117 :
118 : /**
119 : * How often did we retry so far?
120 : */
121 : unsigned int retries;
122 :
123 : };
124 :
125 :
126 : /**
127 : * The exchange's configuration.
128 : */
129 : static const struct GNUNET_CONFIGURATION_Handle *cfg;
130 :
131 : /**
132 : * Our database plugin.
133 : */
134 : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
135 :
136 : /**
137 : * Next task to run, if any.
138 : */
139 : static struct GNUNET_SCHEDULER_Task *task;
140 :
141 : /**
142 : * If we are currently executing transfers, information about
143 : * the active transfers is here. Otherwise, this variable is NULL.
144 : */
145 : static struct WirePrepareData *wpd_head;
146 :
147 : /**
148 : * If we are currently executing transfers, information about
149 : * the active transfers is here. Otherwise, this variable is NULL.
150 : */
151 : static struct WirePrepareData *wpd_tail;
152 :
153 : /**
154 : * Information about our work shard.
155 : */
156 : static struct Shard *shard;
157 :
158 : /**
159 : * Handle to the context for interacting with the bank / wire gateway.
160 : */
161 : static struct GNUNET_CURL_Context *ctx;
162 :
163 : /**
164 : * Randomized back-off we use on serialization errors.
165 : */
166 : static struct GNUNET_TIME_Relative serialization_delay;
167 :
168 : /**
169 : * Scheduler context for running the @e ctx.
170 : */
171 : static struct GNUNET_CURL_RescheduleContext *rc;
172 :
173 : /**
174 : * Value to return from main(). 0 on success, non-zero on errors.
175 : */
176 : static int global_ret;
177 :
178 : /**
179 : * #GNUNET_YES if we are in test mode and should exit when idle.
180 : */
181 : static int test_mode;
182 :
183 : /**
184 : * How long should we sleep when idle before trying to find more work?
185 : * Also used for how long we wait to grab a shard before trying it again.
186 : * The value should be set to a bit above the average time it takes to
187 : * process a shard.
188 : */
189 : static struct GNUNET_TIME_Relative transfer_idle_sleep_interval;
190 :
191 : /**
192 : * How long did we take to finish the last shard?
193 : */
194 : static struct GNUNET_TIME_Relative shard_delay;
195 :
196 : /**
197 : * Size of the shards.
198 : */
199 : static unsigned int shard_size = DEFAULT_BATCH_SIZE;
200 :
201 : /**
202 : * How many workers should we plan our scheduling with?
203 : */
204 : static unsigned int max_workers = 0;
205 :
206 :
207 : /**
208 : * Clean up all active bank interactions.
209 : */
210 : static void
211 60 : cleanup_wpd (void)
212 : {
213 : struct WirePrepareData *wpd;
214 :
215 60 : while (NULL != (wpd = wpd_head))
216 : {
217 0 : GNUNET_CONTAINER_DLL_remove (wpd_head,
218 : wpd_tail,
219 : wpd);
220 0 : if (NULL != wpd->eh)
221 : {
222 0 : TALER_BANK_transfer_cancel (wpd->eh);
223 0 : wpd->eh = NULL;
224 : }
225 0 : GNUNET_free (wpd);
226 : }
227 60 : }
228 :
229 :
230 : /**
231 : * We're being aborted with CTRL-C (or SIGTERM). Shut down.
232 : *
233 : * @param cls closure
234 : */
235 : static void
236 60 : shutdown_task (void *cls)
237 : {
238 : (void) cls;
239 60 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
240 : "Running shutdown\n");
241 60 : if (NULL != task)
242 : {
243 0 : GNUNET_SCHEDULER_cancel (task);
244 0 : task = NULL;
245 : }
246 60 : cleanup_wpd ();
247 60 : GNUNET_free (shard);
248 60 : db_plugin->rollback (db_plugin->cls); /* just in case */
249 60 : TALER_EXCHANGEDB_plugin_unload (db_plugin);
250 60 : db_plugin = NULL;
251 60 : TALER_EXCHANGEDB_unload_accounts ();
252 60 : cfg = NULL;
253 60 : if (NULL != ctx)
254 : {
255 60 : GNUNET_CURL_fini (ctx);
256 60 : ctx = NULL;
257 : }
258 60 : if (NULL != rc)
259 : {
260 60 : GNUNET_CURL_gnunet_rc_destroy (rc);
261 60 : rc = NULL;
262 : }
263 60 : }
264 :
265 :
266 : /**
267 : * Parse the configuration for taler-exchange-transfer.
268 : *
269 : * @return #GNUNET_OK on success
270 : */
271 : static enum GNUNET_GenericReturnValue
272 60 : parse_transfer_config (void)
273 : {
274 60 : if (GNUNET_OK !=
275 60 : GNUNET_CONFIGURATION_get_value_time (cfg,
276 : "exchange",
277 : "TRANSFER_IDLE_SLEEP_INTERVAL",
278 : &transfer_idle_sleep_interval))
279 : {
280 0 : GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
281 : "exchange",
282 : "TRANSFER_IDLE_SLEEP_INTERVAL");
283 0 : return GNUNET_SYSERR;
284 : }
285 60 : if (NULL ==
286 60 : (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg,
287 : false)))
288 : {
289 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
290 : "Failed to initialize DB subsystem\n");
291 0 : return GNUNET_SYSERR;
292 : }
293 60 : if (GNUNET_OK !=
294 60 : TALER_EXCHANGEDB_load_accounts (cfg,
295 : TALER_EXCHANGEDB_ALO_DEBIT
296 : | TALER_EXCHANGEDB_ALO_AUTHDATA))
297 : {
298 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
299 : "No wire accounts configured for debit!\n");
300 0 : TALER_EXCHANGEDB_plugin_unload (db_plugin);
301 0 : db_plugin = NULL;
302 0 : return GNUNET_SYSERR;
303 : }
304 60 : return GNUNET_OK;
305 : }
306 :
307 :
308 : /**
309 : * Perform a database commit. If it fails, print a warning.
310 : *
311 : * @return status of commit
312 : */
313 : static enum GNUNET_DB_QueryStatus
314 74 : commit_or_warn (void)
315 : {
316 : enum GNUNET_DB_QueryStatus qs;
317 :
318 74 : qs = db_plugin->commit (db_plugin->cls);
319 74 : if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
320 : {
321 74 : serialization_delay = GNUNET_TIME_UNIT_ZERO;
322 74 : return qs;
323 : }
324 0 : GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
325 : ? GNUNET_ERROR_TYPE_INFO
326 : : GNUNET_ERROR_TYPE_ERROR,
327 : "Failed to commit database transaction!\n");
328 0 : return qs;
329 : }
330 :
331 :
332 : /**
333 : * Execute the wire transfers that we have committed to
334 : * do.
335 : *
336 : * @param cls NULL
337 : */
338 : static void
339 : run_transfers (void *cls);
340 :
341 :
342 : static void
343 0 : run_transfers_delayed (void *cls)
344 : {
345 : (void) cls;
346 0 : shard->shard_start_time = GNUNET_TIME_absolute_get ();
347 0 : run_transfers (NULL);
348 0 : }
349 :
350 :
351 : /**
352 : * Select shard to process.
353 : *
354 : * @param cls NULL
355 : */
356 : static void
357 : select_shard (void *cls);
358 :
359 :
360 : /**
361 : * We are done with the current batch. Commit
362 : * and move on.
363 : */
364 : static void
365 74 : batch_done (void)
366 : {
367 : /* batch done */
368 74 : GNUNET_assert (NULL == wpd_head);
369 74 : switch (commit_or_warn ())
370 : {
371 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
372 : /* try again */
373 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
374 : "Serialization failure, trying again immediately!\n");
375 0 : GNUNET_assert (NULL == task);
376 0 : task = GNUNET_SCHEDULER_add_now (&run_transfers,
377 : NULL);
378 0 : return;
379 0 : case GNUNET_DB_STATUS_HARD_ERROR:
380 0 : GNUNET_break (0);
381 0 : global_ret = EXIT_FAILURE;
382 0 : GNUNET_SCHEDULER_shutdown ();
383 0 : return;
384 74 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
385 74 : shard->batch_start = shard->batch_end + 1;
386 74 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
387 : "Batch complete\n");
388 : /* continue with #run_transfers(), just to guard
389 : against the unlikely case that there are more. */
390 74 : GNUNET_assert (NULL == task);
391 74 : task = GNUNET_SCHEDULER_add_now (&run_transfers,
392 : NULL);
393 74 : return;
394 0 : default:
395 0 : GNUNET_break (0);
396 0 : global_ret = EXIT_FAILURE;
397 0 : GNUNET_SCHEDULER_shutdown ();
398 0 : return;
399 : }
400 : }
401 :
402 :
403 : /**
404 : * Function called with the result from the execute step.
405 : * On success, we mark the respective wire transfer as finished,
406 : * and in general we afterwards continue to #run_transfers(),
407 : * except for irrecoverable errors.
408 : *
409 : * @param cls `struct WirePrepareData` we are working on
410 : * @param tr transfer response
411 : */
412 : static void
413 67 : wire_confirm_cb (void *cls,
414 : const struct TALER_BANK_TransferResponse *tr)
415 : {
416 67 : struct WirePrepareData *wpd = cls;
417 : enum GNUNET_DB_QueryStatus qs;
418 :
419 67 : wpd->eh = NULL;
420 67 : switch (tr->http_status)
421 : {
422 67 : case MHD_HTTP_OK:
423 67 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
424 : "Wire transfer %llu completed successfully\n",
425 : (unsigned long long) wpd->row_id);
426 67 : qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
427 67 : wpd->row_id);
428 : /* continued below */
429 67 : break;
430 0 : case MHD_HTTP_NOT_FOUND:
431 : case MHD_HTTP_CONFLICT:
432 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
433 : "Wire transaction %llu failed: %u/%d\n",
434 : (unsigned long long) wpd->row_id,
435 : tr->http_status,
436 : tr->ec);
437 0 : qs = db_plugin->wire_prepare_data_mark_failed (db_plugin->cls,
438 0 : wpd->row_id);
439 : /* continued below */
440 0 : break;
441 0 : case 0:
442 : case MHD_HTTP_TOO_MANY_REQUESTS:
443 : case MHD_HTTP_INTERNAL_SERVER_ERROR:
444 : case MHD_HTTP_BAD_GATEWAY:
445 : case MHD_HTTP_SERVICE_UNAVAILABLE:
446 : case MHD_HTTP_GATEWAY_TIMEOUT:
447 0 : wpd->retries++;
448 0 : if (wpd->retries < MAX_RETRIES)
449 : {
450 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
451 : "Wire transfer %llu failed (%u), trying again\n",
452 : (unsigned long long) wpd->row_id,
453 : tr->http_status);
454 0 : wpd->eh = TALER_BANK_transfer (ctx,
455 0 : wpd->wa->auth,
456 0 : &wpd[1],
457 : wpd->buf_size,
458 : &wire_confirm_cb,
459 : wpd);
460 0 : return;
461 : }
462 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
463 : "Wire transaction %llu failed: %u/%d\n",
464 : (unsigned long long) wpd->row_id,
465 : tr->http_status,
466 : tr->ec);
467 0 : cleanup_wpd ();
468 0 : db_plugin->rollback (db_plugin->cls);
469 0 : global_ret = EXIT_FAILURE;
470 0 : GNUNET_SCHEDULER_shutdown ();
471 0 : return;
472 0 : default:
473 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
474 : "Wire transfer %llu failed: %u/%d\n",
475 : (unsigned long long) wpd->row_id,
476 : tr->http_status,
477 : tr->ec);
478 0 : db_plugin->rollback (db_plugin->cls);
479 0 : cleanup_wpd ();
480 0 : global_ret = EXIT_FAILURE;
481 0 : GNUNET_SCHEDULER_shutdown ();
482 0 : return;
483 : }
484 67 : shard->batch_end = GNUNET_MAX (wpd->row_id,
485 : shard->batch_end);
486 67 : switch (qs)
487 : {
488 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
489 0 : db_plugin->rollback (db_plugin->cls);
490 0 : cleanup_wpd ();
491 0 : GNUNET_assert (NULL == task);
492 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
493 : "Serialization failure, trying again immediately!\n");
494 0 : task = GNUNET_SCHEDULER_add_now (&run_transfers,
495 : NULL);
496 0 : return;
497 0 : case GNUNET_DB_STATUS_HARD_ERROR:
498 0 : db_plugin->rollback (db_plugin->cls);
499 0 : cleanup_wpd ();
500 0 : global_ret = EXIT_FAILURE;
501 0 : GNUNET_SCHEDULER_shutdown ();
502 0 : return;
503 67 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
504 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
505 67 : GNUNET_CONTAINER_DLL_remove (wpd_head,
506 : wpd_tail,
507 : wpd);
508 67 : GNUNET_free (wpd);
509 67 : break;
510 : }
511 67 : if (NULL != wpd_head)
512 2 : return; /* wait for other queries to complete */
513 65 : batch_done ();
514 : }
515 :
516 :
517 : /**
518 : * Callback with data about a prepared transaction. Triggers the respective
519 : * wire transfer using the prepared transaction data.
520 : *
521 : * @param cls NULL
522 : * @param rowid row identifier used to mark prepared transaction as done
523 : * @param wire_method wire method the preparation was done for
524 : * @param buf transaction data that was persisted, NULL on error
525 : * @param buf_size number of bytes in @a buf, 0 on error
526 : */
527 : static void
528 76 : wire_prepare_cb (void *cls,
529 : uint64_t rowid,
530 : const char *wire_method,
531 : const char *buf,
532 : size_t buf_size)
533 : {
534 : struct WirePrepareData *wpd;
535 :
536 : (void) cls;
537 76 : if ( (NULL != task) ||
538 76 : (EXIT_SUCCESS != global_ret) )
539 0 : return; /* current transaction was aborted */
540 76 : if (rowid >= shard->shard_end)
541 : {
542 : /* skip */
543 9 : shard->batch_end = shard->shard_end - 1;
544 9 : if (NULL != wpd_head)
545 0 : return;
546 9 : batch_done ();
547 9 : return;
548 : }
549 67 : if ( (NULL == wire_method) ||
550 : (NULL == buf) )
551 : {
552 0 : GNUNET_break (0);
553 0 : db_plugin->rollback (db_plugin->cls);
554 0 : global_ret = EXIT_FAILURE;
555 0 : GNUNET_SCHEDULER_shutdown ();
556 0 : return;
557 : }
558 67 : wpd = GNUNET_malloc (sizeof (struct WirePrepareData)
559 : + buf_size);
560 67 : GNUNET_memcpy (&wpd[1],
561 : buf,
562 : buf_size);
563 67 : wpd->buf_size = buf_size;
564 67 : wpd->row_id = rowid;
565 67 : GNUNET_CONTAINER_DLL_insert (wpd_head,
566 : wpd_tail,
567 : wpd);
568 67 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
569 : "Starting wire transfer %llu\n",
570 : (unsigned long long) rowid);
571 67 : wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method);
572 67 : if (NULL == wpd->wa)
573 : {
574 : /* Should really never happen here, as when we get
575 : here the wire account should be in the cache. */
576 0 : GNUNET_break (0);
577 0 : cleanup_wpd ();
578 0 : db_plugin->rollback (db_plugin->cls);
579 0 : global_ret = EXIT_NO_RESTART;
580 0 : GNUNET_SCHEDULER_shutdown ();
581 0 : return;
582 : }
583 134 : wpd->eh = TALER_BANK_transfer (ctx,
584 67 : wpd->wa->auth,
585 : buf,
586 : buf_size,
587 : &wire_confirm_cb,
588 : wpd);
589 67 : if (NULL == wpd->eh)
590 : {
591 0 : GNUNET_break (0); /* Irrecoverable */
592 0 : cleanup_wpd ();
593 0 : db_plugin->rollback (db_plugin->cls);
594 0 : global_ret = EXIT_FAILURE;
595 0 : GNUNET_SCHEDULER_shutdown ();
596 0 : return;
597 : }
598 : }
599 :
600 :
601 : /**
602 : * Execute the wire transfers that we have committed to
603 : * do.
604 : *
605 : * @param cls NULL
606 : */
607 : static void
608 195 : run_transfers (void *cls)
609 : {
610 : enum GNUNET_DB_QueryStatus qs;
611 : int64_t limit;
612 :
613 : (void) cls;
614 195 : task = NULL;
615 195 : limit = shard->shard_end - shard->batch_start;
616 195 : if (0 >= limit)
617 : {
618 61 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
619 : "Shard [%llu,%llu) completed\n",
620 : (unsigned long long) shard->shard_start,
621 : (unsigned long long) shard->batch_end);
622 61 : qs = db_plugin->complete_shard (db_plugin->cls,
623 : "transfer",
624 61 : shard->shard_start,
625 61 : shard->batch_end + 1);
626 61 : switch (qs)
627 : {
628 0 : case GNUNET_DB_STATUS_HARD_ERROR:
629 0 : GNUNET_break (0);
630 0 : GNUNET_free (shard);
631 0 : GNUNET_SCHEDULER_shutdown ();
632 0 : return;
633 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
634 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
635 : "Got DB soft error for complete_shard. Rolling back.\n");
636 0 : GNUNET_free (shard);
637 0 : GNUNET_assert (NULL == task);
638 0 : task = GNUNET_SCHEDULER_add_now (&select_shard,
639 : NULL);
640 0 : return;
641 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
642 : /* already existed, ok, let's just continue */
643 0 : break;
644 61 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
645 : /* normal case */
646 61 : break;
647 : }
648 61 : shard_delay = GNUNET_TIME_absolute_get_duration (
649 61 : shard->shard_start_time);
650 61 : GNUNET_free (shard);
651 61 : GNUNET_assert (NULL == task);
652 61 : task = GNUNET_SCHEDULER_add_now (&select_shard,
653 : NULL);
654 61 : return;
655 : }
656 : /* cap number of parallel connections to a reasonable
657 : limit for concurrent requests to the bank */
658 134 : limit = GNUNET_MIN (limit,
659 : 256);
660 134 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
661 : "Checking for %lld pending wire transfers [%llu-...)\n",
662 : (long long) limit,
663 : (unsigned long long) shard->batch_start);
664 134 : if (GNUNET_OK !=
665 134 : db_plugin->start_read_committed (db_plugin->cls,
666 : "aggregator run transfer"))
667 : {
668 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
669 : "Failed to start database transaction!\n");
670 0 : global_ret = EXIT_FAILURE;
671 0 : GNUNET_SCHEDULER_shutdown ();
672 0 : return;
673 : }
674 134 : GNUNET_assert (NULL == task);
675 134 : qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
676 134 : shard->batch_start,
677 : limit,
678 : &wire_prepare_cb,
679 : NULL);
680 134 : switch (qs)
681 : {
682 0 : case GNUNET_DB_STATUS_HARD_ERROR:
683 0 : cleanup_wpd ();
684 0 : db_plugin->rollback (db_plugin->cls);
685 0 : GNUNET_break (0);
686 0 : global_ret = EXIT_FAILURE;
687 0 : GNUNET_SCHEDULER_shutdown ();
688 0 : return;
689 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
690 : /* try again */
691 0 : db_plugin->rollback (db_plugin->cls);
692 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
693 : "Serialization failure, trying again immediately!\n");
694 0 : cleanup_wpd ();
695 0 : GNUNET_assert (NULL == task);
696 0 : task = GNUNET_SCHEDULER_add_now (&run_transfers,
697 : NULL);
698 0 : return;
699 60 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
700 : /* no more prepared wire transfers, go sleep a bit! */
701 60 : db_plugin->rollback (db_plugin->cls);
702 60 : GNUNET_assert (NULL == wpd_head);
703 60 : GNUNET_assert (NULL == task);
704 60 : if (GNUNET_YES == test_mode)
705 : {
706 60 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
707 : "No more pending wire transfers, shutting down (because we are in test mode)\n");
708 60 : GNUNET_SCHEDULER_shutdown ();
709 : }
710 : else
711 : {
712 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
713 : "No more pending wire transfers, going idle\n");
714 0 : GNUNET_assert (NULL == task);
715 0 : task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
716 : &run_transfers_delayed,
717 : NULL);
718 : }
719 60 : return;
720 74 : default:
721 : /* continued in wire_prepare_cb() */
722 74 : return;
723 : }
724 : }
725 :
726 :
727 : /**
728 : * Select shard to process.
729 : *
730 : * @param cls NULL
731 : */
732 : static void
733 121 : select_shard (void *cls)
734 : {
735 : enum GNUNET_DB_QueryStatus qs;
736 : struct GNUNET_TIME_Relative delay;
737 : uint64_t start;
738 : uint64_t end;
739 :
740 : (void) cls;
741 121 : task = NULL;
742 121 : GNUNET_assert (NULL == wpd_head);
743 121 : if (GNUNET_SYSERR ==
744 121 : db_plugin->preflight (db_plugin->cls))
745 : {
746 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
747 : "Failed to obtain database connection!\n");
748 0 : global_ret = EXIT_FAILURE;
749 0 : GNUNET_SCHEDULER_shutdown ();
750 0 : return;
751 : }
752 121 : if (0 == max_workers)
753 121 : delay = GNUNET_TIME_UNIT_ZERO;
754 : else
755 0 : delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
756 : GNUNET_CRYPTO_QUALITY_WEAK,
757 0 : 4 * GNUNET_TIME_relative_max (
758 : transfer_idle_sleep_interval,
759 : GNUNET_TIME_relative_multiply (shard_delay,
760 0 : max_workers)).rel_value_us);
761 121 : qs = db_plugin->begin_shard (db_plugin->cls,
762 : "transfer",
763 : delay,
764 : shard_size,
765 : &start,
766 : &end);
767 121 : switch (qs)
768 : {
769 0 : case GNUNET_DB_STATUS_HARD_ERROR:
770 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
771 : "Failed to obtain starting point for monitoring from database!\n");
772 0 : global_ret = EXIT_FAILURE;
773 0 : GNUNET_SCHEDULER_shutdown ();
774 0 : return;
775 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
776 : /* try again */
777 : {
778 0 : serialization_delay = GNUNET_TIME_randomized_backoff (serialization_delay,
779 : GNUNET_TIME_UNIT_SECONDS);
780 0 : GNUNET_assert (NULL == task);
781 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
782 : "Serialization failure, trying again in %s!\n",
783 : GNUNET_TIME_relative2s (serialization_delay,
784 : true));
785 0 : task = GNUNET_SCHEDULER_add_delayed (serialization_delay,
786 : &select_shard,
787 : NULL);
788 : }
789 0 : return;
790 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
791 0 : GNUNET_break (0);
792 0 : GNUNET_assert (NULL == task);
793 0 : task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
794 : &select_shard,
795 : NULL);
796 0 : return;
797 121 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
798 : /* continued below */
799 121 : break;
800 : }
801 121 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
802 : "Starting with shard [%llu,%llu)\n",
803 : (unsigned long long) start,
804 : (unsigned long long) end);
805 121 : shard = GNUNET_new (struct Shard);
806 121 : shard->shard_start_time = GNUNET_TIME_absolute_get ();
807 121 : shard->shard_start = start;
808 121 : shard->shard_end = end;
809 121 : shard->batch_start = start;
810 121 : GNUNET_assert (NULL == task);
811 121 : task = GNUNET_SCHEDULER_add_now (&run_transfers,
812 : NULL);
813 : }
814 :
815 :
816 : /**
817 : * First task.
818 : *
819 : * @param cls closure, NULL
820 : * @param args remaining command-line arguments
821 : * @param cfgfile name of the configuration file used (for saving, can be NULL!)
822 : * @param c configuration
823 : */
824 : static void
825 60 : run (void *cls,
826 : char *const *args,
827 : const char *cfgfile,
828 : const struct GNUNET_CONFIGURATION_Handle *c)
829 : {
830 : (void) cls;
831 : (void) args;
832 : (void) cfgfile;
833 :
834 60 : cfg = c;
835 60 : if (GNUNET_OK != parse_transfer_config ())
836 : {
837 0 : cfg = NULL;
838 0 : global_ret = EXIT_NOTCONFIGURED;
839 0 : return;
840 : }
841 60 : ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
842 : &rc);
843 60 : rc = GNUNET_CURL_gnunet_rc_create (ctx);
844 60 : if (NULL == ctx)
845 : {
846 0 : GNUNET_break (0);
847 0 : return;
848 : }
849 60 : if (GNUNET_SYSERR ==
850 60 : db_plugin->preflight (db_plugin->cls))
851 : {
852 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
853 : "Failed to obtain database connection!\n");
854 0 : global_ret = EXIT_FAILURE;
855 0 : GNUNET_SCHEDULER_shutdown ();
856 0 : return;
857 : }
858 60 : GNUNET_assert (NULL == task);
859 60 : task = GNUNET_SCHEDULER_add_now (&select_shard,
860 : NULL);
861 60 : GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
862 : cls);
863 : }
864 :
865 :
866 : /**
867 : * The main function of the taler-exchange-transfer.
868 : *
869 : * @param argc number of arguments from the command line
870 : * @param argv command line arguments
871 : * @return 0 ok, 1 on error
872 : */
873 : int
874 60 : main (int argc,
875 : char *const *argv)
876 : {
877 60 : struct GNUNET_GETOPT_CommandLineOption options[] = {
878 60 : GNUNET_GETOPT_option_uint ('S',
879 : "size",
880 : "SIZE",
881 : "Size to process per shard (default: 1024)",
882 : &shard_size),
883 60 : GNUNET_GETOPT_option_timetravel ('T',
884 : "timetravel"),
885 60 : GNUNET_GETOPT_option_flag ('t',
886 : "test",
887 : "run in test mode and exit when idle",
888 : &test_mode),
889 60 : GNUNET_GETOPT_option_uint ('w',
890 : "workers",
891 : "COUNT",
892 : "Plan work load with up to COUNT worker processes (default: 16)",
893 : &max_workers),
894 60 : GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
895 : GNUNET_GETOPT_OPTION_END
896 : };
897 : enum GNUNET_GenericReturnValue ret;
898 :
899 60 : ret = GNUNET_PROGRAM_run (
900 : TALER_EXCHANGE_project_data (),
901 : argc, argv,
902 : "taler-exchange-transfer",
903 : gettext_noop (
904 : "background process that executes outgoing wire transfers"),
905 : options,
906 : &run, NULL);
907 60 : if (GNUNET_SYSERR == ret)
908 0 : return EXIT_INVALIDARGUMENT;
909 60 : if (GNUNET_NO == ret)
910 0 : return EXIT_SUCCESS;
911 60 : return global_ret;
912 : }
913 :
914 :
915 : /* end of taler-exchange-transfer.c */
|