Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2016--2023 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU Affero General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
12 :
13 : You should have received a copy of the GNU Affero General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file taler-exchange-wirewatch.c
18 : * @brief Process that watches for wire transfers to the exchange's bank account
19 : * @author Christian Grothoff
20 : */
21 : #include "taler/platform.h"
22 : #include <gnunet/gnunet_util_lib.h>
23 : #include <jansson.h>
24 : #include <pthread.h>
25 : #include <microhttpd.h>
26 : #include "taler/taler_exchangedb_lib.h"
27 : #include "taler/taler_exchangedb_plugin.h"
28 : #include "taler/taler_json_lib.h"
29 : #include "taler/taler_bank_service.h"
30 :
31 :
32 : /**
33 : * How long to wait for an HTTP reply if there
34 : * are no transactions pending at the server?
35 : */
36 : #define LONGPOLL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 :
38 : /**
39 : * What is the maximum batch size we use for credit history
40 : * requests with the bank. See `batch_size` below.
41 : */
42 : #define MAXIMUM_BATCH_SIZE 1024
43 :
44 : /**
45 : * Information about our account.
46 : */
47 : static const struct TALER_EXCHANGEDB_AccountInfo *ai;
48 :
49 : /**
50 : * Active request for history.
51 : */
52 : static struct TALER_BANK_CreditHistoryHandle *hh;
53 :
54 : /**
55 : * Set to true if the request for history did actually
56 : * return transaction items.
57 : */
58 : static bool hh_returned_data;
59 :
60 : /**
61 : * Set to true if the request for history did not
62 : * succeed because the account was unknown.
63 : */
64 : static bool hh_account_404;
65 :
66 : /**
67 : * When did we start the last @e hh request?
68 : */
69 : static struct GNUNET_TIME_Absolute hh_start_time;
70 :
71 : /**
72 : * Until when is processing this wire plugin delayed?
73 : */
74 : static struct GNUNET_TIME_Absolute delayed_until;
75 :
76 : /**
77 : * Encoded offset in the wire transfer list from where
78 : * to start the next query with the bank.
79 : */
80 : static uint64_t batch_start;
81 :
82 : /**
83 : * Latest row offset seen in this transaction, becomes
84 : * the new #batch_start upon commit.
85 : */
86 : static uint64_t latest_row_off;
87 :
88 : /**
89 : * Offset where our current shard begins (inclusive).
90 : */
91 : static uint64_t shard_start;
92 :
93 : /**
94 : * Offset where our current shard ends (exclusive).
95 : */
96 : static uint64_t shard_end;
97 :
98 : /**
99 : * When did we start with the shard?
100 : */
101 : static struct GNUNET_TIME_Absolute shard_start_time;
102 :
103 : /**
104 : * For how long did we lock the shard?
105 : */
106 : static struct GNUNET_TIME_Absolute shard_end_time;
107 :
108 : /**
109 : * How long did we take to finish the last shard
110 : * for this account?
111 : */
112 : static struct GNUNET_TIME_Relative shard_delay;
113 :
114 : /**
115 : * How long did we take to finish the last shard
116 : * for this account?
117 : */
118 : static struct GNUNET_TIME_Relative longpoll_timeout;
119 :
120 : /**
121 : * How long do we wait on 404.
122 : */
123 : static struct GNUNET_TIME_Relative h404_backoff;
124 :
125 : /**
126 : * Name of our job in the shard table.
127 : */
128 : static char *job_name;
129 :
130 : /**
131 : * How many transactions do we retrieve per batch?
132 : */
133 : static unsigned int batch_size;
134 :
135 : /**
136 : * How much do we increment @e batch_size on success?
137 : */
138 : static unsigned int batch_thresh;
139 :
140 : /**
141 : * Did work remain in the transaction queue? Set to true
142 : * if we did some work and thus there might be more.
143 : */
144 : static bool progress;
145 :
146 : /**
147 : * Did we start a transaction yet?
148 : */
149 : static bool started_transaction;
150 :
151 : /**
152 : * Is this shard still open for processing.
153 : */
154 : static bool shard_open;
155 :
156 : /**
157 : * Handle to the context for interacting with the bank.
158 : */
159 : static struct GNUNET_CURL_Context *ctx;
160 :
161 : /**
162 : * Scheduler context for running the @e ctx.
163 : */
164 : static struct GNUNET_CURL_RescheduleContext *rc;
165 :
166 : /**
167 : * The exchange's configuration (global)
168 : */
169 : static const struct GNUNET_CONFIGURATION_Handle *cfg;
170 :
171 : /**
172 : * Our DB plugin.
173 : */
174 : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
175 :
176 : /**
177 : * How long should we sleep when idle before trying to find more work?
178 : * Also used for how long we wait to grab a shard before trying it again.
179 : * The value should be set to a bit above the average time it takes to
180 : * process a shard.
181 : */
182 : static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
183 :
184 : /**
185 : * How long do we sleep on serialization conflicts?
186 : */
187 : static struct GNUNET_TIME_Relative wirewatch_conflict_sleep_interval;
188 :
189 : /**
190 : * Modulus to apply to group shards. The shard size must ultimately be a
191 : * multiple of the batch size. Thus, if this is not a multiple of the
192 : * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
193 : */
194 : static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
195 :
196 : /**
197 : * How many workers should we plan our scheduling with?
198 : */
199 : static unsigned int max_workers = 16;
200 :
201 : /**
202 : * -e command-line option: exit on errors talking to the bank?
203 : */
204 : static int exit_on_error;
205 :
206 : /**
207 : * Value to return from main(). 0 on success, non-zero on
208 : * on serious errors.
209 : */
210 : static int global_ret;
211 :
212 : /**
213 : * Are we run in testing mode and should only do one pass?
214 : */
215 : static int test_mode;
216 :
217 : /**
218 : * Should we ignore if the bank does not know our bank
219 : * account?
220 : */
221 : static int ignore_account_404;
222 :
223 : /**
224 : * Current task waiting for execution, if any.
225 : */
226 : static struct GNUNET_SCHEDULER_Task *task;
227 :
228 : /**
229 : * Name of the configuration section with the account we should watch.
230 : */
231 : static char *account_section;
232 :
233 : /**
234 : * We're being aborted with CTRL-C (or SIGTERM). Shut down.
235 : *
236 : * @param cls closure
237 : */
238 : static void
239 64 : shutdown_task (void *cls)
240 : {
241 : enum GNUNET_DB_QueryStatus qs;
242 : (void) cls;
243 :
244 64 : if (NULL != hh)
245 : {
246 2 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
247 : "History request cancelled on shutdown\n");
248 2 : TALER_BANK_credit_history_cancel (hh);
249 2 : hh = NULL;
250 : }
251 64 : if (started_transaction)
252 : {
253 0 : db_plugin->rollback (db_plugin->cls);
254 0 : started_transaction = false;
255 : }
256 64 : if (shard_open)
257 : {
258 64 : qs = db_plugin->abort_shard (db_plugin->cls,
259 : job_name,
260 : shard_start,
261 : shard_end);
262 64 : if (qs <= 0)
263 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
264 : "Failed to abort work shard on shutdown\n");
265 : }
266 64 : GNUNET_free (job_name);
267 64 : if (NULL != ctx)
268 : {
269 64 : GNUNET_CURL_fini (ctx);
270 64 : ctx = NULL;
271 : }
272 64 : if (NULL != rc)
273 : {
274 64 : GNUNET_CURL_gnunet_rc_destroy (rc);
275 64 : rc = NULL;
276 : }
277 64 : if (NULL != task)
278 : {
279 0 : GNUNET_SCHEDULER_cancel (task);
280 0 : task = NULL;
281 : }
282 64 : TALER_EXCHANGEDB_plugin_unload (db_plugin);
283 64 : db_plugin = NULL;
284 64 : TALER_EXCHANGEDB_unload_accounts ();
285 64 : cfg = NULL;
286 64 : }
287 :
288 :
289 : /**
290 : * Function called with information about a wire account. Adds the
291 : * account to our list (if it is enabled and we can load the plugin).
292 : *
293 : * @param cls closure, NULL
294 : * @param in_ai account information
295 : */
296 : static void
297 124 : add_account_cb (void *cls,
298 : const struct TALER_EXCHANGEDB_AccountInfo *in_ai)
299 : {
300 : (void) cls;
301 124 : if (! in_ai->credit_enabled)
302 0 : return; /* not enabled for us, skip */
303 124 : if ( (NULL != account_section) &&
304 122 : (0 != strcasecmp (in_ai->section_name,
305 : account_section)) )
306 60 : return; /* not enabled for us, skip */
307 64 : if (NULL != ai)
308 : {
309 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
310 : "Multiple accounts enabled (%s and %s), use '-a' command-line option to select one!\n",
311 : ai->section_name,
312 : in_ai->section_name);
313 0 : GNUNET_SCHEDULER_shutdown ();
314 0 : global_ret = EXIT_INVALIDARGUMENT;
315 0 : return;
316 : }
317 64 : ai = in_ai;
318 64 : GNUNET_asprintf (&job_name,
319 : "wirewatch-%s",
320 64 : ai->section_name);
321 64 : batch_size = MAXIMUM_BATCH_SIZE;
322 64 : if (0 != shard_size % batch_size)
323 62 : batch_size = shard_size;
324 : }
325 :
326 :
327 : /**
328 : * Parse configuration parameters for the exchange server into the
329 : * corresponding global variables.
330 : *
331 : * @return #GNUNET_OK on success
332 : */
333 : static enum GNUNET_GenericReturnValue
334 64 : exchange_serve_process_config (void)
335 : {
336 64 : if (GNUNET_OK !=
337 64 : GNUNET_CONFIGURATION_get_value_time (cfg,
338 : "exchange",
339 : "WIREWATCH_IDLE_SLEEP_INTERVAL",
340 : &wirewatch_idle_sleep_interval))
341 : {
342 0 : GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
343 : "exchange",
344 : "WIREWATCH_IDLE_SLEEP_INTERVAL");
345 0 : return GNUNET_SYSERR;
346 : }
347 64 : if (NULL ==
348 64 : (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg,
349 : false)))
350 : {
351 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
352 : "Failed to initialize DB subsystem\n");
353 0 : return GNUNET_SYSERR;
354 : }
355 64 : if (GNUNET_OK !=
356 64 : TALER_EXCHANGEDB_load_accounts (cfg,
357 : TALER_EXCHANGEDB_ALO_CREDIT
358 : | TALER_EXCHANGEDB_ALO_AUTHDATA))
359 : {
360 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
361 : "No wire accounts configured for credit!\n");
362 0 : return GNUNET_SYSERR;
363 : }
364 64 : TALER_EXCHANGEDB_find_accounts (&add_account_cb,
365 : NULL);
366 64 : if (NULL == ai)
367 : {
368 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
369 : "No accounts enabled for credit!\n");
370 0 : GNUNET_SCHEDULER_shutdown ();
371 0 : return GNUNET_SYSERR;
372 : }
373 64 : return GNUNET_OK;
374 : }
375 :
376 :
377 : /**
378 : * Lock a shard and then begin to query for incoming wire transfers.
379 : *
380 : * @param cls NULL
381 : */
382 : static void
383 : lock_shard (void *cls);
384 :
385 :
386 : /**
387 : * Continue with the credit history of the shard.
388 : *
389 : * @param cls NULL
390 : */
391 : static void
392 : continue_with_shard (void *cls);
393 :
394 :
395 : /**
396 : * We encountered a serialization error. Rollback the transaction and try
397 : * again.
398 : */
399 : static void
400 0 : handle_soft_error (void)
401 : {
402 0 : db_plugin->rollback (db_plugin->cls);
403 0 : started_transaction = false;
404 0 : if (1 < batch_size)
405 : {
406 0 : batch_thresh = batch_size;
407 0 : batch_size /= 2;
408 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
409 : "Reduced batch size to %llu due to serialization issue\n",
410 : (unsigned long long) batch_size);
411 : }
412 : /* Reset to beginning of transaction, and go again
413 : from there. */
414 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
415 : "Encountered soft error, resetting start point to batch start\n");
416 0 : latest_row_off = batch_start;
417 0 : GNUNET_assert (NULL == task);
418 0 : task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
419 : NULL);
420 0 : }
421 :
422 :
423 : /**
424 : * Schedule the #lock_shard() operation.
425 : */
426 : static void
427 169 : schedule_transfers (void)
428 : {
429 169 : if (shard_open)
430 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
431 : "Will retry my shard (%llu,%llu] of %s in %s\n",
432 : (unsigned long long) shard_start,
433 : (unsigned long long) shard_end,
434 : job_name,
435 : GNUNET_STRINGS_relative_time_to_string (
436 : GNUNET_TIME_absolute_get_remaining (delayed_until),
437 : true));
438 : else
439 169 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
440 : "Will try to lock next shard of %s in %s\n",
441 : job_name,
442 : GNUNET_STRINGS_relative_time_to_string (
443 : GNUNET_TIME_absolute_get_remaining (delayed_until),
444 : true));
445 169 : GNUNET_assert (NULL == task);
446 169 : task = GNUNET_SCHEDULER_add_at (delayed_until,
447 : &lock_shard,
448 : NULL);
449 169 : }
450 :
451 :
452 : /**
453 : * We are done with the work that is possible right now (and the transaction
454 : * was committed, if there was one to commit). Move on to the next shard.
455 : */
456 : static void
457 167 : transaction_completed (void)
458 : {
459 167 : if ( (batch_start + batch_size ==
460 61 : latest_row_off) &&
461 61 : (batch_size < MAXIMUM_BATCH_SIZE) )
462 : {
463 : /* The current batch size worked without serialization
464 : issues, and we are allowed to grow. Do so slowly. */
465 : int delta;
466 :
467 61 : delta = ((int) batch_thresh - (int) batch_size) / 4;
468 61 : if (delta < 0)
469 0 : delta = -delta;
470 61 : batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
471 : batch_size + delta + 1);
472 61 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
473 : "Increasing batch size to %llu\n",
474 : (unsigned long long) batch_size);
475 : }
476 :
477 167 : if ( (! progress) && test_mode)
478 : {
479 : /* Transaction list was drained and we are in
480 : test mode. So we are done. */
481 62 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
482 : "Transaction list drained and in test mode. Exiting\n");
483 62 : GNUNET_SCHEDULER_shutdown ();
484 62 : return;
485 : }
486 105 : if (! (hh_returned_data || hh_account_404) )
487 : {
488 : /* Enforce long-polling delay even if the server ignored it
489 : and returned earlier */
490 : struct GNUNET_TIME_Relative latency;
491 : struct GNUNET_TIME_Relative left;
492 :
493 0 : latency = GNUNET_TIME_absolute_get_duration (hh_start_time);
494 0 : left = GNUNET_TIME_relative_subtract (longpoll_timeout,
495 : latency);
496 0 : if (! (test_mode ||
497 0 : GNUNET_TIME_relative_is_zero (left)) )
498 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
499 : "Server did not respect long-polling, enforcing client-side by sleeping for %s\n",
500 : GNUNET_TIME_relative2s (left,
501 : true));
502 0 : delayed_until = GNUNET_TIME_relative_to_absolute (left);
503 : }
504 105 : if (hh_account_404)
505 : {
506 0 : h404_backoff = GNUNET_TIME_STD_BACKOFF (h404_backoff);
507 0 : delayed_until = GNUNET_TIME_relative_to_absolute (
508 : h404_backoff);
509 : }
510 : else
511 : {
512 105 : h404_backoff = GNUNET_TIME_UNIT_ZERO;
513 : }
514 105 : if (test_mode)
515 105 : delayed_until = GNUNET_TIME_UNIT_ZERO_ABS;
516 105 : GNUNET_assert (NULL == task);
517 105 : schedule_transfers ();
518 : }
519 :
520 :
521 : /**
522 : * We got incoming transaction details from the bank. Add them
523 : * to the database.
524 : *
525 : * @param details array of transaction details
526 : * @param details_length length of the @a details array
527 : */
528 : static void
529 109 : process_reply (const struct TALER_BANK_CreditDetails *details,
530 : unsigned int details_length)
531 : {
532 : enum GNUNET_DB_QueryStatus qs;
533 : bool shard_done;
534 109 : uint64_t lroff = latest_row_off;
535 :
536 109 : if (0 == details_length)
537 : {
538 : /* Server should have used 204, not 200! */
539 0 : GNUNET_break_op (0);
540 0 : transaction_completed ();
541 0 : return;
542 : }
543 109 : hh_returned_data = true;
544 : /* check serial IDs for range constraints */
545 183 : for (unsigned int i = 0; i<details_length; i++)
546 : {
547 109 : const struct TALER_BANK_CreditDetails *cd = &details[i];
548 :
549 109 : if (cd->serial_id < lroff)
550 : {
551 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
552 : "Serial ID %llu not monotonic (got %llu before). Failing!\n",
553 : (unsigned long long) cd->serial_id,
554 : (unsigned long long) lroff);
555 0 : db_plugin->rollback (db_plugin->cls);
556 0 : GNUNET_SCHEDULER_shutdown ();
557 0 : return;
558 : }
559 109 : if (cd->serial_id > shard_end)
560 : {
561 : /* we are *past* the current shard (likely because the serial_id of the
562 : shard_end happens to not exist in the DB). So commit and stop this
563 : iteration! */
564 35 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
565 : "Serial ID %llu past shard end at %llu, ending iteration early!\n",
566 : (unsigned long long) cd->serial_id,
567 : (unsigned long long) shard_end);
568 35 : details_length = i;
569 35 : progress = true;
570 35 : lroff = cd->serial_id - 1;
571 35 : break;
572 : }
573 74 : lroff = cd->serial_id;
574 : }
575 109 : if (0 != details_length)
576 74 : {
577 74 : enum GNUNET_DB_QueryStatus qss[details_length];
578 74 : struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length];
579 74 : unsigned int j = 0;
580 :
581 : /* make compiler happy */
582 74 : memset (qss,
583 : 0,
584 : sizeof (qss));
585 74 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
586 : "Importing %u transactions\n",
587 : details_length);
588 148 : for (unsigned int i = 0; i<details_length; i++)
589 : {
590 74 : const struct TALER_BANK_CreditDetails *cd = &details[i];
591 :
592 74 : switch (cd->type)
593 : {
594 58 : case TALER_BANK_CT_RESERVE:
595 : {
596 58 : struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[j++];
597 :
598 : /* add to batch, do later */
599 58 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
600 : "Importing reserve transfer over %s\n",
601 : TALER_amount2s (&cd->amount));
602 58 : res->reserve_pub = &cd->details.reserve.reserve_pub;
603 58 : res->balance = &cd->amount;
604 58 : res->execution_time = cd->execution_date;
605 58 : res->sender_account_details = cd->debit_account_uri;
606 58 : res->exchange_account_name = ai->section_name;
607 58 : res->wire_reference = cd->serial_id;
608 : }
609 58 : break;
610 16 : case TALER_BANK_CT_KYCAUTH:
611 : {
612 16 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
613 : "Importing KYC auth transfer over %s\n",
614 : TALER_amount2s (&cd->amount));
615 16 : qs = db_plugin->kycauth_in_insert (
616 16 : db_plugin->cls,
617 : &cd->details.kycauth.account_pub,
618 : &cd->amount,
619 : cd->execution_date,
620 : cd->debit_account_uri,
621 16 : ai->section_name,
622 16 : cd->serial_id);
623 : switch (qs)
624 : {
625 0 : case GNUNET_DB_STATUS_HARD_ERROR:
626 0 : GNUNET_break (0);
627 0 : GNUNET_SCHEDULER_shutdown ();
628 0 : return;
629 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
630 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
631 : "Got DB soft error for kycauth_in_insert (%u). Rolling back.\n",
632 : i);
633 0 : handle_soft_error ();
634 0 : return;
635 16 : default:
636 16 : break;
637 : }
638 16 : break;
639 : }
640 0 : case TALER_BANK_CT_WAD:
641 : {
642 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
643 : "Importing WAD transfer over %s\n",
644 : TALER_amount2s (&cd->amount));
645 0 : qs = db_plugin->wad_in_insert (
646 0 : db_plugin->cls,
647 : &cd->details.wad.wad_id,
648 0 : cd->details.wad.origin_exchange_url,
649 : &cd->amount,
650 : cd->execution_date,
651 : cd->debit_account_uri,
652 0 : ai->section_name,
653 0 : cd->serial_id);
654 : switch (qs)
655 : {
656 0 : case GNUNET_DB_STATUS_HARD_ERROR:
657 0 : GNUNET_break (0);
658 0 : GNUNET_SCHEDULER_shutdown ();
659 0 : return;
660 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
661 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
662 : "Got DB soft error for wad_in_insert (%u). Rolling back.\n",
663 : i);
664 0 : handle_soft_error ();
665 0 : return;
666 0 : default:
667 0 : break;
668 : }
669 :
670 : }
671 : }
672 : }
673 74 : if (j > 0)
674 : {
675 58 : qs = db_plugin->reserves_in_insert (db_plugin->cls,
676 : reserves,
677 : j,
678 : qss);
679 58 : switch (qs)
680 : {
681 0 : case GNUNET_DB_STATUS_HARD_ERROR:
682 0 : GNUNET_break (0);
683 0 : GNUNET_SCHEDULER_shutdown ();
684 0 : return;
685 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
686 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
687 : "Got DB soft error for reserves_in_insert (%u). Rolling back.\n",
688 : details_length);
689 0 : handle_soft_error ();
690 0 : return;
691 58 : default:
692 58 : break;
693 : }
694 : }
695 74 : j = 0;
696 148 : for (unsigned int i = 0; i<details_length; i++)
697 : {
698 74 : const struct TALER_BANK_CreditDetails *cd = &details[i];
699 :
700 74 : if (TALER_BANK_CT_RESERVE != cd->type)
701 16 : continue;
702 58 : switch (qss[j++])
703 : {
704 0 : case GNUNET_DB_STATUS_HARD_ERROR:
705 0 : GNUNET_break (0);
706 0 : GNUNET_SCHEDULER_shutdown ();
707 0 : return;
708 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
709 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
710 : "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n",
711 : i);
712 0 : handle_soft_error ();
713 0 : return;
714 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
715 : /* Either wirewatch was freshly started after the system was
716 : shutdown and we're going over an incomplete shard again
717 : after being restarted, or the shard lock period was too
718 : short (number of workers set incorrectly?) and a 2nd
719 : wirewatcher has been stealing our work while we are still
720 : at it. */
721 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
722 : "Attempted to import transaction %llu (%s) twice. "
723 : "This should happen rarely (if not, ask for support).\n",
724 : (unsigned long long) cd->serial_id,
725 : job_name);
726 0 : break;
727 58 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
728 58 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
729 : "Imported transaction %llu.\n",
730 : (unsigned long long) cd->serial_id);
731 : /* normal case */
732 58 : progress = true;
733 58 : break;
734 : }
735 : }
736 : }
737 :
738 109 : latest_row_off = lroff;
739 109 : shard_done = (shard_end <= latest_row_off);
740 109 : if (shard_done)
741 : {
742 : /* shard is complete, mark this as well */
743 105 : qs = db_plugin->complete_shard (db_plugin->cls,
744 : job_name,
745 : shard_start,
746 : shard_end);
747 105 : switch (qs)
748 : {
749 0 : case GNUNET_DB_STATUS_HARD_ERROR:
750 0 : GNUNET_break (0);
751 0 : GNUNET_SCHEDULER_shutdown ();
752 0 : return;
753 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
754 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
755 : "Got DB soft error for complete_shard. Rolling back.\n");
756 0 : handle_soft_error ();
757 0 : return;
758 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
759 0 : GNUNET_break (0);
760 : /* Not expected, but let's just continue */
761 0 : break;
762 105 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
763 : /* normal case */
764 105 : progress = true;
765 105 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
766 : "Completed shard %s (%llu,%llu] after %s\n",
767 : job_name,
768 : (unsigned long long) shard_start,
769 : (unsigned long long) shard_end,
770 : GNUNET_STRINGS_relative_time_to_string (
771 : GNUNET_TIME_absolute_get_duration (shard_start_time),
772 : true));
773 105 : break;
774 : }
775 105 : shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
776 105 : shard_open = false;
777 105 : transaction_completed ();
778 105 : return;
779 : }
780 4 : GNUNET_assert (NULL == task);
781 4 : task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
782 : NULL);
783 : }
784 :
785 :
786 : /**
787 : * Callbacks of this type are used to serve the result of asking
788 : * the bank for the transaction history.
789 : *
790 : * @param cls NULL
791 : * @param reply response we got from the bank
792 : */
793 : static void
794 171 : history_cb (void *cls,
795 : const struct TALER_BANK_CreditHistoryResponse *reply)
796 : {
797 : (void) cls;
798 171 : GNUNET_assert (NULL == task);
799 171 : hh = NULL;
800 171 : GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801 : "History request returned with HTTP status %u\n",
802 : reply->http_status);
803 171 : switch (reply->http_status)
804 : {
805 109 : case MHD_HTTP_OK:
806 109 : process_reply (reply->details.ok.details,
807 109 : reply->details.ok.details_length);
808 109 : return;
809 61 : case MHD_HTTP_NO_CONTENT:
810 61 : transaction_completed ();
811 61 : return;
812 1 : case MHD_HTTP_NOT_FOUND:
813 1 : hh_account_404 = true;
814 1 : if (ignore_account_404)
815 : {
816 0 : transaction_completed ();
817 0 : return;
818 : }
819 1 : break;
820 0 : default:
821 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
822 : "Error fetching history: %s (%u)\n",
823 : TALER_ErrorCode_get_hint (reply->ec),
824 : reply->http_status);
825 0 : break;
826 : }
827 1 : if (! exit_on_error)
828 : {
829 1 : transaction_completed ();
830 1 : return;
831 : }
832 0 : GNUNET_SCHEDULER_shutdown ();
833 : }
834 :
835 :
836 : static void
837 173 : continue_with_shard (void *cls)
838 : {
839 : unsigned int limit;
840 :
841 : (void) cls;
842 173 : task = NULL;
843 173 : GNUNET_assert (shard_end > latest_row_off);
844 173 : limit = GNUNET_MIN (batch_size,
845 : shard_end - latest_row_off);
846 173 : GNUNET_assert (NULL == hh);
847 173 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
848 : "Requesting credit history starting from %llu\n",
849 : (unsigned long long) latest_row_off);
850 173 : hh_start_time = GNUNET_TIME_absolute_get ();
851 173 : hh_returned_data = false;
852 173 : hh_account_404 = false;
853 173 : hh = TALER_BANK_credit_history (ctx,
854 173 : ai->auth,
855 : latest_row_off,
856 : limit,
857 : test_mode
858 173 : ? GNUNET_TIME_UNIT_ZERO
859 : : longpoll_timeout,
860 : &history_cb,
861 : NULL);
862 173 : if (NULL == hh)
863 : {
864 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
865 : "Failed to start request for account history!\n");
866 0 : global_ret = EXIT_FAILURE;
867 0 : GNUNET_SCHEDULER_shutdown ();
868 0 : return;
869 : }
870 : }
871 :
872 :
873 : /**
874 : * Reserve a shard for us to work on.
875 : *
876 : * @param cls NULL
877 : */
878 : static void
879 169 : lock_shard (void *cls)
880 : {
881 : enum GNUNET_DB_QueryStatus qs;
882 : struct GNUNET_TIME_Relative delay;
883 169 : uint64_t last_shard_start = shard_start;
884 169 : uint64_t last_shard_end = shard_end;
885 :
886 : (void) cls;
887 169 : task = NULL;
888 169 : if (GNUNET_SYSERR ==
889 169 : db_plugin->preflight (db_plugin->cls))
890 : {
891 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
892 : "Failed to obtain database connection!\n");
893 0 : global_ret = EXIT_FAILURE;
894 0 : GNUNET_SCHEDULER_shutdown ();
895 0 : return;
896 : }
897 169 : if ( (shard_open) &&
898 0 : (GNUNET_TIME_absolute_is_future (shard_end_time)) )
899 : {
900 0 : progress = false;
901 0 : batch_start = latest_row_off;
902 0 : task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
903 : NULL);
904 0 : return;
905 : }
906 169 : if (shard_open)
907 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
908 : "Shard not completed in time, will try to re-acquire\n");
909 : /* How long we lock a shard depends on the number of
910 : workers expected, and how long we usually took to
911 : process a shard. */
912 169 : if (0 == max_workers)
913 167 : delay = GNUNET_TIME_UNIT_ZERO;
914 : else
915 2 : delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
916 : GNUNET_CRYPTO_QUALITY_WEAK,
917 2 : 4 * GNUNET_TIME_relative_max (
918 : wirewatch_idle_sleep_interval,
919 : GNUNET_TIME_relative_multiply (shard_delay,
920 2 : max_workers)).rel_value_us);
921 169 : shard_start_time = GNUNET_TIME_absolute_get ();
922 169 : qs = db_plugin->begin_shard (db_plugin->cls,
923 : job_name,
924 : delay,
925 : shard_size,
926 : &shard_start,
927 : &shard_end);
928 169 : switch (qs)
929 : {
930 0 : case GNUNET_DB_STATUS_HARD_ERROR:
931 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
932 : "Failed to obtain starting point for monitoring from database!\n");
933 0 : global_ret = EXIT_FAILURE;
934 0 : GNUNET_SCHEDULER_shutdown ();
935 0 : return;
936 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
937 : /* try again */
938 : {
939 : struct GNUNET_TIME_Relative rdelay;
940 :
941 : wirewatch_conflict_sleep_interval
942 0 : = GNUNET_TIME_STD_BACKOFF (wirewatch_conflict_sleep_interval);
943 0 : rdelay = GNUNET_TIME_randomize (wirewatch_conflict_sleep_interval);
944 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
945 : "Serialization error tying to obtain shard %s, will try again in %s!\n",
946 : job_name,
947 : GNUNET_STRINGS_relative_time_to_string (rdelay,
948 : true));
949 : #if 1
950 0 : if (GNUNET_TIME_relative_cmp (rdelay,
951 : >,
952 : GNUNET_TIME_UNIT_SECONDS))
953 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
954 : "Delay would have been for %s\n",
955 : GNUNET_TIME_relative2s (rdelay,
956 : true));
957 0 : rdelay = GNUNET_TIME_relative_min (rdelay,
958 : GNUNET_TIME_UNIT_SECONDS);
959 : #endif
960 0 : delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
961 : }
962 0 : GNUNET_assert (NULL == task);
963 0 : schedule_transfers ();
964 0 : return;
965 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
966 0 : GNUNET_break (0);
967 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
968 : "No shard available, will try again for %s in %s!\n",
969 : job_name,
970 : GNUNET_STRINGS_relative_time_to_string (
971 : wirewatch_idle_sleep_interval,
972 : true));
973 0 : delayed_until = GNUNET_TIME_relative_to_absolute (
974 : wirewatch_idle_sleep_interval);
975 0 : shard_open = false;
976 0 : GNUNET_assert (NULL == task);
977 0 : schedule_transfers ();
978 0 : return;
979 169 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
980 : /* continued below */
981 169 : wirewatch_conflict_sleep_interval = GNUNET_TIME_UNIT_ZERO;
982 169 : break;
983 : }
984 169 : shard_end_time = GNUNET_TIME_relative_to_absolute (delay);
985 169 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
986 : "Starting with shard %s at (%llu,%llu] locked for %s\n",
987 : job_name,
988 : (unsigned long long) shard_start,
989 : (unsigned long long) shard_end,
990 : GNUNET_STRINGS_relative_time_to_string (delay,
991 : true));
992 169 : progress = false;
993 169 : batch_start = shard_start;
994 169 : if ( (shard_open) &&
995 0 : (shard_start == last_shard_start) &&
996 0 : (shard_end == last_shard_end) )
997 : {
998 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
999 : "Continuing from %llu\n",
1000 : (unsigned long long) latest_row_off);
1001 0 : GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */
1002 : }
1003 : else
1004 : {
1005 169 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1006 : "Resetting shard start to original start point (%d)\n",
1007 : shard_open ? 1 : 0);
1008 169 : latest_row_off = batch_start;
1009 : }
1010 169 : shard_open = true;
1011 169 : task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
1012 : NULL);
1013 : }
1014 :
1015 :
1016 : /**
1017 : * First task.
1018 : *
1019 : * @param cls closure, NULL
1020 : * @param args remaining command-line arguments
1021 : * @param cfgfile name of the configuration file used (for saving, can be NULL!)
1022 : * @param c configuration
1023 : */
1024 : static void
1025 64 : run (void *cls,
1026 : char *const *args,
1027 : const char *cfgfile,
1028 : const struct GNUNET_CONFIGURATION_Handle *c)
1029 : {
1030 : (void) cls;
1031 : (void) args;
1032 : (void) cfgfile;
1033 :
1034 64 : cfg = c;
1035 64 : GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1036 : cls);
1037 64 : if (GNUNET_OK !=
1038 64 : exchange_serve_process_config ())
1039 : {
1040 0 : global_ret = EXIT_NOTCONFIGURED;
1041 0 : GNUNET_SCHEDULER_shutdown ();
1042 0 : return;
1043 : }
1044 64 : ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
1045 : &rc);
1046 64 : if (NULL == ctx)
1047 : {
1048 0 : GNUNET_break (0);
1049 0 : GNUNET_SCHEDULER_shutdown ();
1050 0 : global_ret = EXIT_NO_RESTART;
1051 0 : return;
1052 : }
1053 64 : rc = GNUNET_CURL_gnunet_rc_create (ctx);
1054 64 : schedule_transfers ();
1055 : }
1056 :
1057 :
1058 : /**
1059 : * The main function of taler-exchange-wirewatch
1060 : *
1061 : * @param argc number of arguments from the command line
1062 : * @param argv command line arguments
1063 : * @return 0 ok, non-zero on error
1064 : */
1065 : int
1066 64 : main (int argc,
1067 : char *const *argv)
1068 : {
1069 64 : struct GNUNET_GETOPT_CommandLineOption options[] = {
1070 64 : GNUNET_GETOPT_option_string ('a',
1071 : "account",
1072 : "SECTION_NAME",
1073 : "name of the configuration section with the account we should watch (needed if more than one is enabled for crediting)",
1074 : &account_section),
1075 64 : GNUNET_GETOPT_option_flag ('e',
1076 : "exit-on-error",
1077 : "terminate wirewatch if we failed to download information from the bank",
1078 : &exit_on_error),
1079 64 : GNUNET_GETOPT_option_relative_time ('f',
1080 : "longpoll-timeout",
1081 : "DELAY",
1082 : "what is the timeout when asking the bank about new transactions, specify with unit (e.g. --longpoll-timeout=30s)",
1083 : &longpoll_timeout),
1084 64 : GNUNET_GETOPT_option_flag ('I',
1085 : "ignore-not-found",
1086 : "continue, even if the bank account of the exchange was not found",
1087 : &ignore_account_404),
1088 64 : GNUNET_GETOPT_option_uint ('S',
1089 : "size",
1090 : "SIZE",
1091 : "Size to process per shard (default: 1024)",
1092 : &shard_size),
1093 64 : GNUNET_GETOPT_option_timetravel ('T',
1094 : "timetravel"),
1095 64 : GNUNET_GETOPT_option_flag ('t',
1096 : "test",
1097 : "run in test mode and exit when idle",
1098 : &test_mode),
1099 64 : GNUNET_GETOPT_option_uint ('w',
1100 : "workers",
1101 : "COUNT",
1102 : "Plan work load with up to COUNT worker processes (default: 16)",
1103 : &max_workers),
1104 64 : GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
1105 : GNUNET_GETOPT_OPTION_END
1106 : };
1107 : enum GNUNET_GenericReturnValue ret;
1108 :
1109 64 : longpoll_timeout = LONGPOLL_TIMEOUT;
1110 64 : ret = GNUNET_PROGRAM_run (
1111 : TALER_EXCHANGE_project_data (),
1112 : argc, argv,
1113 : "taler-exchange-wirewatch",
1114 : gettext_noop (
1115 : "background process that watches for incoming wire transfers from customers"),
1116 : options,
1117 : &run, NULL);
1118 64 : if (GNUNET_SYSERR == ret)
1119 0 : return EXIT_INVALIDARGUMENT;
1120 64 : if (GNUNET_NO == ret)
1121 0 : return EXIT_SUCCESS;
1122 64 : return global_ret;
1123 : }
1124 :
1125 :
1126 : /* end of taler-exchange-wirewatch.c */
|