Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2016--2023 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU Affero General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
12 :
13 : You should have received a copy of the GNU Affero General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file taler-exchange-wirewatch.c
18 : * @brief Process that watches for wire transfers to the exchange's bank account
19 : * @author Christian Grothoff
20 : */
21 : #include "taler/platform.h"
22 : #include <gnunet/gnunet_util_lib.h>
23 : #include <jansson.h>
24 : #include <pthread.h>
25 : #include <microhttpd.h>
26 : #include "taler/taler_exchangedb_lib.h"
27 : #include "taler/taler_exchangedb_plugin.h"
28 : #include "taler/taler_json_lib.h"
29 : #include "taler/taler_bank_service.h"
30 :
31 :
32 : /**
33 : * How long to wait for an HTTP reply if there
34 : * are no transactions pending at the server?
35 : */
36 : #define LONGPOLL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 :
38 : /**
39 : * What is the maximum batch size we use for credit history
40 : * requests with the bank. See `batch_size` below.
41 : */
42 : #define MAXIMUM_BATCH_SIZE 1024
43 :
44 : /**
45 : * Information about our account.
46 : */
47 : static const struct TALER_EXCHANGEDB_AccountInfo *ai;
48 :
49 : /**
50 : * Active request for history.
51 : */
52 : static struct TALER_BANK_CreditHistoryHandle *hh;
53 :
54 : /**
55 : * Set to true if the request for history did actually
56 : * return transaction items.
57 : */
58 : static bool hh_returned_data;
59 :
60 : /**
61 : * Set to true if the request for history did not
62 : * succeed because the account was unknown.
63 : */
64 : static bool hh_account_404;
65 :
66 : /**
67 : * Set to true if the request for history did not
68 : * succeed because of some unexpected HTTP request error.
69 : */
70 : static bool hh_error;
71 :
72 : /**
73 : * When did we start the last @e hh request?
74 : */
75 : static struct GNUNET_TIME_Absolute hh_start_time;
76 :
77 : /**
78 : * Until when is processing this wire plugin delayed?
79 : */
80 : static struct GNUNET_TIME_Absolute delayed_until;
81 :
82 : /**
83 : * Encoded offset in the wire transfer list from where
84 : * to start the next query with the bank.
85 : */
86 : static uint64_t batch_start;
87 :
88 : /**
89 : * Latest row offset seen in this transaction, becomes
90 : * the new #batch_start upon commit.
91 : */
92 : static uint64_t latest_row_off;
93 :
94 : /**
95 : * Offset where our current shard begins (inclusive).
96 : */
97 : static uint64_t shard_start;
98 :
99 : /**
100 : * Offset where our current shard ends (exclusive).
101 : */
102 : static uint64_t shard_end;
103 :
104 : /**
105 : * When did we start with the shard?
106 : */
107 : static struct GNUNET_TIME_Absolute shard_start_time;
108 :
109 : /**
110 : * For how long did we lock the shard?
111 : */
112 : static struct GNUNET_TIME_Absolute shard_end_time;
113 :
114 : /**
115 : * How long did we take to finish the last shard
116 : * for this account?
117 : */
118 : static struct GNUNET_TIME_Relative shard_delay;
119 :
120 : /**
121 : * How long did we take to finish the last shard
122 : * for this account?
123 : */
124 : static struct GNUNET_TIME_Relative longpoll_timeout;
125 :
126 : /**
127 : * How long do we wait on 404.
128 : */
129 : static struct GNUNET_TIME_Relative h404_backoff;
130 :
131 : /**
132 : * How long do we wait on HTTP history request errors.
133 : */
134 : static struct GNUNET_TIME_Relative hh_error_backoff;
135 :
136 : /**
137 : * Name of our job in the shard table.
138 : */
139 : static char *job_name;
140 :
141 : /**
142 : * How many transactions do we retrieve per batch?
143 : */
144 : static unsigned int batch_size;
145 :
146 : /**
147 : * How much do we increment @e batch_size on success?
148 : */
149 : static unsigned int batch_thresh;
150 :
151 : /**
152 : * Did work remain in the transaction queue? Set to true
153 : * if we did some work and thus there might be more.
154 : */
155 : static bool progress;
156 :
157 : /**
158 : * Did we start a transaction yet?
159 : */
160 : static bool started_transaction;
161 :
162 : /**
163 : * Is this shard still open for processing.
164 : */
165 : static bool shard_open;
166 :
167 : /**
168 : * Handle to the context for interacting with the bank.
169 : */
170 : static struct GNUNET_CURL_Context *ctx;
171 :
172 : /**
173 : * Scheduler context for running the @e ctx.
174 : */
175 : static struct GNUNET_CURL_RescheduleContext *rc;
176 :
177 : /**
178 : * The exchange's configuration (global)
179 : */
180 : static const struct GNUNET_CONFIGURATION_Handle *cfg;
181 :
182 : /**
183 : * Our DB plugin.
184 : */
185 : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
186 :
187 : /**
188 : * How long should we sleep when idle before trying to find more work?
189 : * Also used for how long we wait to grab a shard before trying it again.
190 : * The value should be set to a bit above the average time it takes to
191 : * process a shard.
192 : */
193 : static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
194 :
195 : /**
196 : * How long do we sleep on serialization conflicts?
197 : */
198 : static struct GNUNET_TIME_Relative wirewatch_conflict_sleep_interval;
199 :
200 : /**
201 : * Modulus to apply to group shards. The shard size must ultimately be a
202 : * multiple of the batch size. Thus, if this is not a multiple of the
203 : * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
204 : */
205 : static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
206 :
207 : /**
208 : * How many workers should we plan our scheduling with?
209 : */
210 : static unsigned int max_workers = 16;
211 :
212 : /**
213 : * -e command-line option: exit on errors talking to the bank?
214 : */
215 : static int exit_on_error;
216 :
217 : /**
218 : * Value to return from main(). 0 on success, non-zero on
219 : * on serious errors.
220 : */
221 : static int global_ret;
222 :
223 : /**
224 : * Are we run in testing mode and should only do one pass?
225 : */
226 : static int test_mode;
227 :
228 : /**
229 : * Should we ignore if the bank does not know our bank
230 : * account?
231 : */
232 : static int ignore_account_404;
233 :
234 : /**
235 : * Current task waiting for execution, if any.
236 : */
237 : static struct GNUNET_SCHEDULER_Task *task;
238 :
239 : /**
240 : * Name of the configuration section with the account we should watch.
241 : */
242 : static char *account_section;
243 :
244 : /**
245 : * We're being aborted with CTRL-C (or SIGTERM). Shut down.
246 : *
247 : * @param cls closure
248 : */
249 : static void
250 62 : shutdown_task (void *cls)
251 : {
252 : enum GNUNET_DB_QueryStatus qs;
253 : (void) cls;
254 :
255 62 : if (NULL != hh)
256 : {
257 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
258 : "History request cancelled on shutdown\n");
259 0 : TALER_BANK_credit_history_cancel (hh);
260 0 : hh = NULL;
261 : }
262 62 : if (started_transaction)
263 : {
264 0 : db_plugin->rollback (db_plugin->cls);
265 0 : started_transaction = false;
266 : }
267 62 : if (shard_open)
268 : {
269 62 : qs = db_plugin->abort_shard (db_plugin->cls,
270 : job_name,
271 : shard_start,
272 : shard_end);
273 62 : if (qs <= 0)
274 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
275 : "Failed to abort work shard on shutdown\n");
276 : }
277 62 : GNUNET_free (job_name);
278 62 : if (NULL != ctx)
279 : {
280 62 : GNUNET_CURL_fini (ctx);
281 62 : ctx = NULL;
282 : }
283 62 : if (NULL != rc)
284 : {
285 62 : GNUNET_CURL_gnunet_rc_destroy (rc);
286 62 : rc = NULL;
287 : }
288 62 : if (NULL != task)
289 : {
290 0 : GNUNET_SCHEDULER_cancel (task);
291 0 : task = NULL;
292 : }
293 62 : TALER_EXCHANGEDB_plugin_unload (db_plugin);
294 62 : db_plugin = NULL;
295 62 : TALER_EXCHANGEDB_unload_accounts ();
296 62 : cfg = NULL;
297 62 : }
298 :
299 :
300 : /**
301 : * Function called with information about a wire account. Adds the
302 : * account to our list (if it is enabled and we can load the plugin).
303 : *
304 : * @param cls closure, NULL
305 : * @param in_ai account information
306 : */
307 : static void
308 172 : add_account_cb (void *cls,
309 : const struct TALER_EXCHANGEDB_AccountInfo *in_ai)
310 : {
311 : (void) cls;
312 172 : if (! in_ai->credit_enabled)
313 0 : return; /* not enabled for us, skip */
314 172 : if ( (NULL != account_section) &&
315 170 : (0 != strcasecmp (in_ai->section_name,
316 : account_section)) )
317 110 : return; /* not enabled for us, skip */
318 62 : if (NULL != ai)
319 : {
320 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
321 : "Multiple accounts enabled (%s and %s), use '-a' command-line option to select one!\n",
322 : ai->section_name,
323 : in_ai->section_name);
324 0 : GNUNET_SCHEDULER_shutdown ();
325 0 : global_ret = EXIT_INVALIDARGUMENT;
326 0 : return;
327 : }
328 62 : ai = in_ai;
329 62 : GNUNET_asprintf (&job_name,
330 : "wirewatch-%s",
331 62 : ai->section_name);
332 62 : batch_size = MAXIMUM_BATCH_SIZE;
333 62 : if (0 != shard_size % batch_size)
334 62 : batch_size = shard_size;
335 : }
336 :
337 :
338 : /**
339 : * Parse configuration parameters for the exchange server into the
340 : * corresponding global variables.
341 : *
342 : * @return #GNUNET_OK on success
343 : */
344 : static enum GNUNET_GenericReturnValue
345 62 : exchange_serve_process_config (void)
346 : {
347 62 : if (GNUNET_OK !=
348 62 : GNUNET_CONFIGURATION_get_value_time (cfg,
349 : "exchange",
350 : "WIREWATCH_IDLE_SLEEP_INTERVAL",
351 : &wirewatch_idle_sleep_interval))
352 : {
353 0 : GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
354 : "exchange",
355 : "WIREWATCH_IDLE_SLEEP_INTERVAL");
356 0 : return GNUNET_SYSERR;
357 : }
358 62 : if (NULL ==
359 62 : (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg,
360 : false)))
361 : {
362 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
363 : "Failed to initialize DB subsystem\n");
364 0 : return GNUNET_SYSERR;
365 : }
366 62 : if (GNUNET_OK !=
367 62 : TALER_EXCHANGEDB_load_accounts (cfg,
368 : TALER_EXCHANGEDB_ALO_CREDIT
369 : | TALER_EXCHANGEDB_ALO_AUTHDATA))
370 : {
371 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
372 : "No wire accounts configured for credit!\n");
373 0 : return GNUNET_SYSERR;
374 : }
375 62 : TALER_EXCHANGEDB_find_accounts (&add_account_cb,
376 : NULL);
377 62 : if (NULL == ai)
378 : {
379 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
380 : "No accounts enabled for credit!\n");
381 0 : GNUNET_SCHEDULER_shutdown ();
382 0 : return GNUNET_SYSERR;
383 : }
384 62 : return GNUNET_OK;
385 : }
386 :
387 :
388 : /**
389 : * Lock a shard and then begin to query for incoming wire transfers.
390 : *
391 : * @param cls NULL
392 : */
393 : static void
394 : lock_shard (void *cls);
395 :
396 :
397 : /**
398 : * Continue with the credit history of the shard.
399 : *
400 : * @param cls NULL
401 : */
402 : static void
403 : continue_with_shard (void *cls);
404 :
405 :
406 : /**
407 : * We encountered a serialization error. Rollback the transaction and try
408 : * again.
409 : */
410 : static void
411 0 : handle_soft_error (void)
412 : {
413 0 : db_plugin->rollback (db_plugin->cls);
414 0 : started_transaction = false;
415 0 : if (1 < batch_size)
416 : {
417 0 : batch_thresh = batch_size;
418 0 : batch_size /= 2;
419 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
420 : "Reduced batch size to %llu due to serialization issue\n",
421 : (unsigned long long) batch_size);
422 : }
423 : /* Reset to beginning of transaction, and go again
424 : from there. */
425 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
426 : "Encountered soft error, resetting start point to batch start\n");
427 0 : latest_row_off = batch_start;
428 0 : GNUNET_assert (NULL == task);
429 0 : task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
430 : NULL);
431 0 : }
432 :
433 :
434 : /**
435 : * Schedule the #lock_shard() operation.
436 : */
437 : static void
438 167 : schedule_transfers (void)
439 : {
440 167 : if (shard_open)
441 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
442 : "Will retry my shard (%llu,%llu] of %s in %s\n",
443 : (unsigned long long) shard_start,
444 : (unsigned long long) shard_end,
445 : job_name,
446 : GNUNET_STRINGS_relative_time_to_string (
447 : GNUNET_TIME_absolute_get_remaining (delayed_until),
448 : true));
449 : else
450 167 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
451 : "Will try to lock next shard of %s in %s\n",
452 : job_name,
453 : GNUNET_STRINGS_relative_time_to_string (
454 : GNUNET_TIME_absolute_get_remaining (delayed_until),
455 : true));
456 167 : GNUNET_assert (NULL == task);
457 167 : task = GNUNET_SCHEDULER_add_at (delayed_until,
458 : &lock_shard,
459 : NULL);
460 167 : }
461 :
462 :
463 : /**
464 : * We are done with the work that is possible right now (and the transaction
465 : * was committed, if there was one to commit). Move on to the next shard.
466 : */
467 : static void
468 167 : transaction_completed (void)
469 : {
470 167 : if ( (batch_start + batch_size ==
471 61 : latest_row_off) &&
472 61 : (batch_size < MAXIMUM_BATCH_SIZE) )
473 : {
474 : /* The current batch size worked without serialization
475 : issues, and we are allowed to grow. Do so slowly. */
476 : int delta;
477 :
478 61 : delta = ((int) batch_thresh - (int) batch_size) / 4;
479 61 : if (delta < 0)
480 0 : delta = -delta;
481 61 : batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
482 : batch_size + delta + 1);
483 61 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
484 : "Increasing batch size to %llu\n",
485 : (unsigned long long) batch_size);
486 : }
487 :
488 167 : if ( (! progress) && test_mode)
489 : {
490 : /* Transaction list was drained and we are in
491 : test mode. So we are done. */
492 62 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
493 : "Transaction list drained and in test mode. Exiting\n");
494 62 : GNUNET_SCHEDULER_shutdown ();
495 62 : return;
496 : }
497 105 : if (! (hh_returned_data || hh_account_404 || hh_error) )
498 : {
499 : /* Enforce long-polling delay even if the server ignored it
500 : and returned earlier */
501 : struct GNUNET_TIME_Relative latency;
502 : struct GNUNET_TIME_Relative left;
503 :
504 0 : latency = GNUNET_TIME_absolute_get_duration (hh_start_time);
505 0 : left = GNUNET_TIME_relative_subtract (longpoll_timeout,
506 : latency);
507 0 : if (! (test_mode ||
508 0 : GNUNET_TIME_relative_is_zero (left)) )
509 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
510 : "Server did not respect long-polling, enforcing client-side by sleeping for %s\n",
511 : GNUNET_TIME_relative2s (left,
512 : true));
513 0 : delayed_until = GNUNET_TIME_relative_to_absolute (left);
514 : }
515 105 : if (hh_account_404)
516 : {
517 0 : h404_backoff = GNUNET_TIME_STD_BACKOFF (h404_backoff);
518 0 : delayed_until = GNUNET_TIME_relative_to_absolute (
519 : h404_backoff);
520 : }
521 : else
522 : {
523 105 : h404_backoff = GNUNET_TIME_UNIT_ZERO;
524 : }
525 105 : if (hh_error)
526 : {
527 0 : hh_error_backoff = GNUNET_TIME_STD_BACKOFF (hh_error_backoff);
528 0 : delayed_until = GNUNET_TIME_relative_to_absolute (
529 : hh_error_backoff);
530 : }
531 : else
532 : {
533 105 : hh_error_backoff = GNUNET_TIME_UNIT_ZERO;
534 : }
535 105 : if (test_mode)
536 105 : delayed_until = GNUNET_TIME_UNIT_ZERO_ABS;
537 105 : GNUNET_assert (NULL == task);
538 105 : schedule_transfers ();
539 : }
540 :
541 :
542 : /**
543 : * We got incoming transaction details from the bank. Add them
544 : * to the database.
545 : *
546 : * @param details array of transaction details
547 : * @param details_length length of the @a details array
548 : */
549 : static void
550 105 : process_reply (const struct TALER_BANK_CreditDetails *details,
551 : unsigned int details_length)
552 : {
553 : enum GNUNET_DB_QueryStatus qs;
554 : bool shard_done;
555 105 : uint64_t lroff = latest_row_off;
556 :
557 105 : if (0 == details_length)
558 : {
559 : /* Server should have used 204, not 200! */
560 0 : GNUNET_break_op (0);
561 0 : transaction_completed ();
562 0 : return;
563 : }
564 105 : hh_returned_data = true;
565 : /* check serial IDs for range constraints */
566 175 : for (unsigned int i = 0; i<details_length; i++)
567 : {
568 105 : const struct TALER_BANK_CreditDetails *cd = &details[i];
569 :
570 105 : if (cd->serial_id < lroff)
571 : {
572 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
573 : "Serial ID %llu not monotonic (got %llu before). Failing!\n",
574 : (unsigned long long) cd->serial_id,
575 : (unsigned long long) lroff);
576 0 : db_plugin->rollback (db_plugin->cls);
577 0 : GNUNET_SCHEDULER_shutdown ();
578 0 : return;
579 : }
580 105 : if (cd->serial_id > shard_end)
581 : {
582 : /* we are *past* the current shard (likely because the serial_id of the
583 : shard_end happens to not exist in the DB). So commit and stop this
584 : iteration! */
585 35 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
586 : "Serial ID %llu past shard end at %llu, ending iteration early!\n",
587 : (unsigned long long) cd->serial_id,
588 : (unsigned long long) shard_end);
589 35 : details_length = i;
590 35 : progress = true;
591 35 : lroff = cd->serial_id - 1;
592 35 : break;
593 : }
594 70 : lroff = cd->serial_id;
595 : }
596 105 : if (0 != details_length)
597 70 : {
598 70 : enum GNUNET_DB_QueryStatus qss[details_length];
599 70 : struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length];
600 70 : unsigned int j = 0;
601 :
602 : /* make compiler happy */
603 70 : memset (qss,
604 : 0,
605 : sizeof (qss));
606 70 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
607 : "Importing %u transactions\n",
608 : details_length);
609 140 : for (unsigned int i = 0; i<details_length; i++)
610 : {
611 70 : const struct TALER_BANK_CreditDetails *cd = &details[i];
612 :
613 70 : switch (cd->type)
614 : {
615 54 : case TALER_BANK_CT_RESERVE:
616 : {
617 54 : struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[j++];
618 :
619 : /* add to batch, do later */
620 54 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
621 : "Importing reserve transfer over %s\n",
622 : TALER_amount2s (&cd->amount));
623 54 : res->reserve_pub = &cd->details.reserve.reserve_pub;
624 54 : res->balance = &cd->amount;
625 54 : res->execution_time = cd->execution_date;
626 54 : res->sender_account_details = cd->debit_account_uri;
627 54 : res->exchange_account_name = ai->section_name;
628 54 : res->wire_reference = cd->serial_id;
629 : }
630 54 : break;
631 16 : case TALER_BANK_CT_KYCAUTH:
632 : {
633 16 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
634 : "Importing KYC auth transfer over %s\n",
635 : TALER_amount2s (&cd->amount));
636 16 : qs = db_plugin->kycauth_in_insert (
637 16 : db_plugin->cls,
638 : &cd->details.kycauth.account_pub,
639 : &cd->amount,
640 : cd->execution_date,
641 : cd->debit_account_uri,
642 16 : ai->section_name,
643 16 : cd->serial_id);
644 16 : switch (qs)
645 : {
646 0 : case GNUNET_DB_STATUS_HARD_ERROR:
647 0 : GNUNET_break (0);
648 0 : GNUNET_SCHEDULER_shutdown ();
649 0 : return;
650 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
651 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
652 : "Got DB soft error for kycauth_in_insert (%u). Rolling back.\n",
653 : i);
654 0 : handle_soft_error ();
655 0 : return;
656 16 : default:
657 16 : break;
658 : }
659 16 : break;
660 : }
661 0 : case TALER_BANK_CT_WAD:
662 : {
663 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
664 : "Importing WAD transfer over %s\n",
665 : TALER_amount2s (&cd->amount));
666 0 : qs = db_plugin->wad_in_insert (
667 0 : db_plugin->cls,
668 : &cd->details.wad.wad_id,
669 0 : cd->details.wad.origin_exchange_url,
670 : &cd->amount,
671 : cd->execution_date,
672 : cd->debit_account_uri,
673 0 : ai->section_name,
674 0 : cd->serial_id);
675 0 : switch (qs)
676 : {
677 0 : case GNUNET_DB_STATUS_HARD_ERROR:
678 0 : GNUNET_break (0);
679 0 : GNUNET_SCHEDULER_shutdown ();
680 0 : return;
681 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
682 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
683 : "Got DB soft error for wad_in_insert (%u). Rolling back.\n",
684 : i);
685 0 : handle_soft_error ();
686 0 : return;
687 0 : default:
688 0 : break;
689 : }
690 :
691 : }
692 : }
693 : }
694 70 : if (j > 0)
695 : {
696 54 : qs = db_plugin->reserves_in_insert (db_plugin->cls,
697 : reserves,
698 : j,
699 : qss);
700 54 : switch (qs)
701 : {
702 0 : case GNUNET_DB_STATUS_HARD_ERROR:
703 0 : GNUNET_break (0);
704 0 : GNUNET_SCHEDULER_shutdown ();
705 0 : return;
706 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
707 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
708 : "Got DB soft error for reserves_in_insert (%u). Rolling back.\n",
709 : details_length);
710 0 : handle_soft_error ();
711 0 : return;
712 54 : default:
713 54 : break;
714 : }
715 : }
716 70 : j = 0;
717 140 : for (unsigned int i = 0; i<details_length; i++)
718 : {
719 70 : const struct TALER_BANK_CreditDetails *cd = &details[i];
720 :
721 70 : if (TALER_BANK_CT_RESERVE != cd->type)
722 16 : continue;
723 54 : switch (qss[j++])
724 : {
725 0 : case GNUNET_DB_STATUS_HARD_ERROR:
726 0 : GNUNET_break (0);
727 0 : GNUNET_SCHEDULER_shutdown ();
728 0 : return;
729 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
730 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
731 : "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n",
732 : i);
733 0 : handle_soft_error ();
734 0 : return;
735 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
736 : /* Either wirewatch was freshly started after the system was
737 : shutdown and we're going over an incomplete shard again
738 : after being restarted, or the shard lock period was too
739 : short (number of workers set incorrectly?) and a 2nd
740 : wirewatcher has been stealing our work while we are still
741 : at it. */
742 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
743 : "Attempted to import transaction %llu (%s) twice. "
744 : "This should happen rarely (if not, ask for support).\n",
745 : (unsigned long long) cd->serial_id,
746 : job_name);
747 0 : break;
748 54 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
749 54 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
750 : "Imported transaction %llu.\n",
751 : (unsigned long long) cd->serial_id);
752 : /* normal case */
753 54 : progress = true;
754 54 : break;
755 : }
756 : }
757 : }
758 :
759 105 : latest_row_off = lroff;
760 105 : shard_done = (shard_end <= latest_row_off);
761 105 : if (shard_done)
762 : {
763 : /* shard is complete, mark this as well */
764 105 : qs = db_plugin->complete_shard (db_plugin->cls,
765 : job_name,
766 : shard_start,
767 : shard_end);
768 105 : switch (qs)
769 : {
770 0 : case GNUNET_DB_STATUS_HARD_ERROR:
771 0 : GNUNET_break (0);
772 0 : GNUNET_SCHEDULER_shutdown ();
773 0 : return;
774 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
775 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
776 : "Got DB soft error for complete_shard. Rolling back.\n");
777 0 : handle_soft_error ();
778 0 : return;
779 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
780 0 : GNUNET_break (0);
781 : /* Not expected, but let's just continue */
782 0 : break;
783 105 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
784 : /* normal case */
785 105 : progress = true;
786 105 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
787 : "Completed shard %s (%llu,%llu] after %s\n",
788 : job_name,
789 : (unsigned long long) shard_start,
790 : (unsigned long long) shard_end,
791 : GNUNET_STRINGS_relative_time_to_string (
792 : GNUNET_TIME_absolute_get_duration (shard_start_time),
793 : true));
794 105 : break;
795 : }
796 105 : shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
797 105 : shard_open = false;
798 105 : transaction_completed ();
799 105 : return;
800 : }
801 0 : GNUNET_assert (NULL == task);
802 0 : task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
803 : NULL);
804 : }
805 :
806 :
807 : /**
808 : * Callbacks of this type are used to serve the result of asking
809 : * the bank for the transaction history.
810 : *
811 : * @param cls NULL
812 : * @param reply response we got from the bank
813 : */
814 : static void
815 167 : history_cb (void *cls,
816 : const struct TALER_BANK_CreditHistoryResponse *reply)
817 : {
818 : (void) cls;
819 167 : GNUNET_assert (NULL == task);
820 167 : hh = NULL;
821 167 : GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
822 : "History request returned with HTTP status %u\n",
823 : reply->http_status);
824 167 : switch (reply->http_status)
825 : {
826 105 : case MHD_HTTP_OK:
827 105 : process_reply (reply->details.ok.details,
828 105 : reply->details.ok.details_length);
829 105 : return;
830 61 : case MHD_HTTP_NO_CONTENT:
831 61 : transaction_completed ();
832 61 : return;
833 1 : case MHD_HTTP_NOT_FOUND:
834 1 : hh_account_404 = true;
835 1 : if (ignore_account_404)
836 : {
837 0 : transaction_completed ();
838 0 : return;
839 : }
840 1 : break;
841 0 : default:
842 0 : hh_error = true;
843 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
844 : "Error fetching history: %s (%u)\n",
845 : TALER_ErrorCode_get_hint (reply->ec),
846 : reply->http_status);
847 0 : break;
848 : }
849 1 : if (! exit_on_error)
850 : {
851 1 : transaction_completed ();
852 1 : return;
853 : }
854 0 : GNUNET_SCHEDULER_shutdown ();
855 : }
856 :
857 :
858 : static void
859 167 : continue_with_shard (void *cls)
860 : {
861 : unsigned int limit;
862 :
863 : (void) cls;
864 167 : task = NULL;
865 167 : GNUNET_assert (shard_end > latest_row_off);
866 167 : limit = GNUNET_MIN (batch_size,
867 : shard_end - latest_row_off);
868 167 : GNUNET_assert (NULL == hh);
869 167 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
870 : "Requesting credit history starting from %llu\n",
871 : (unsigned long long) latest_row_off);
872 167 : hh_start_time = GNUNET_TIME_absolute_get ();
873 167 : hh_returned_data = false;
874 167 : hh_account_404 = false;
875 167 : hh_error = false;
876 167 : hh = TALER_BANK_credit_history (ctx,
877 167 : ai->auth,
878 : latest_row_off,
879 : limit,
880 : test_mode
881 167 : ? GNUNET_TIME_UNIT_ZERO
882 : : longpoll_timeout,
883 : &history_cb,
884 : NULL);
885 167 : if (NULL == hh)
886 : {
887 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
888 : "Failed to start request for account history!\n");
889 0 : global_ret = EXIT_FAILURE;
890 0 : GNUNET_SCHEDULER_shutdown ();
891 0 : return;
892 : }
893 : }
894 :
895 :
896 : /**
897 : * Reserve a shard for us to work on.
898 : *
899 : * @param cls NULL
900 : */
901 : static void
902 167 : lock_shard (void *cls)
903 : {
904 : enum GNUNET_DB_QueryStatus qs;
905 : struct GNUNET_TIME_Relative delay;
906 167 : uint64_t last_shard_start = shard_start;
907 167 : uint64_t last_shard_end = shard_end;
908 :
909 : (void) cls;
910 167 : task = NULL;
911 167 : if (GNUNET_SYSERR ==
912 167 : db_plugin->preflight (db_plugin->cls))
913 : {
914 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
915 : "Failed to obtain database connection!\n");
916 0 : global_ret = EXIT_FAILURE;
917 0 : GNUNET_SCHEDULER_shutdown ();
918 0 : return;
919 : }
920 167 : if ( (shard_open) &&
921 0 : (GNUNET_TIME_absolute_is_future (shard_end_time)) )
922 : {
923 0 : progress = false;
924 0 : batch_start = latest_row_off;
925 0 : task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
926 : NULL);
927 0 : return;
928 : }
929 167 : if (shard_open)
930 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
931 : "Shard not completed in time, will try to re-acquire\n");
932 : /* How long we lock a shard depends on the number of
933 : workers expected, and how long we usually took to
934 : process a shard. */
935 167 : if (0 == max_workers)
936 167 : delay = GNUNET_TIME_UNIT_ZERO;
937 : else
938 0 : delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
939 : GNUNET_CRYPTO_QUALITY_WEAK,
940 0 : 4 * GNUNET_TIME_relative_max (
941 : wirewatch_idle_sleep_interval,
942 : GNUNET_TIME_relative_multiply (shard_delay,
943 0 : max_workers)).rel_value_us);
944 167 : shard_start_time = GNUNET_TIME_absolute_get ();
945 167 : qs = db_plugin->begin_shard (db_plugin->cls,
946 : job_name,
947 : delay,
948 : shard_size,
949 : &shard_start,
950 : &shard_end);
951 167 : switch (qs)
952 : {
953 0 : case GNUNET_DB_STATUS_HARD_ERROR:
954 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
955 : "Failed to obtain starting point for monitoring from database!\n");
956 0 : global_ret = EXIT_FAILURE;
957 0 : GNUNET_SCHEDULER_shutdown ();
958 0 : return;
959 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
960 : /* try again */
961 : {
962 : struct GNUNET_TIME_Relative rdelay;
963 :
964 : wirewatch_conflict_sleep_interval
965 0 : = GNUNET_TIME_STD_BACKOFF (wirewatch_conflict_sleep_interval);
966 0 : rdelay = GNUNET_TIME_randomize (wirewatch_conflict_sleep_interval);
967 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
968 : "Serialization error tying to obtain shard %s, will try again in %s!\n",
969 : job_name,
970 : GNUNET_STRINGS_relative_time_to_string (rdelay,
971 : true));
972 : #if 1
973 0 : if (GNUNET_TIME_relative_cmp (rdelay,
974 : >,
975 : GNUNET_TIME_UNIT_SECONDS))
976 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
977 : "Delay would have been for %s\n",
978 : GNUNET_TIME_relative2s (rdelay,
979 : true));
980 0 : rdelay = GNUNET_TIME_relative_min (rdelay,
981 : GNUNET_TIME_UNIT_SECONDS);
982 : #endif
983 0 : delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
984 : }
985 0 : GNUNET_assert (NULL == task);
986 0 : schedule_transfers ();
987 0 : return;
988 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
989 0 : GNUNET_break (0);
990 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
991 : "No shard available, will try again for %s in %s!\n",
992 : job_name,
993 : GNUNET_STRINGS_relative_time_to_string (
994 : wirewatch_idle_sleep_interval,
995 : true));
996 0 : delayed_until = GNUNET_TIME_relative_to_absolute (
997 : wirewatch_idle_sleep_interval);
998 0 : shard_open = false;
999 0 : GNUNET_assert (NULL == task);
1000 0 : schedule_transfers ();
1001 0 : return;
1002 167 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
1003 : /* continued below */
1004 167 : wirewatch_conflict_sleep_interval = GNUNET_TIME_UNIT_ZERO;
1005 167 : break;
1006 : }
1007 167 : shard_end_time = GNUNET_TIME_relative_to_absolute (delay);
1008 167 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1009 : "Starting with shard %s at (%llu,%llu] locked for %s\n",
1010 : job_name,
1011 : (unsigned long long) shard_start,
1012 : (unsigned long long) shard_end,
1013 : GNUNET_STRINGS_relative_time_to_string (delay,
1014 : true));
1015 167 : progress = false;
1016 167 : batch_start = shard_start;
1017 167 : if ( (shard_open) &&
1018 0 : (shard_start == last_shard_start) &&
1019 0 : (shard_end == last_shard_end) )
1020 : {
1021 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1022 : "Continuing from %llu\n",
1023 : (unsigned long long) latest_row_off);
1024 0 : GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */
1025 : }
1026 : else
1027 : {
1028 167 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1029 : "Resetting shard start to original start point (%d)\n",
1030 : shard_open ? 1 : 0);
1031 167 : latest_row_off = batch_start;
1032 : }
1033 167 : shard_open = true;
1034 167 : task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
1035 : NULL);
1036 : }
1037 :
1038 :
1039 : /**
1040 : * First task.
1041 : *
1042 : * @param cls closure, NULL
1043 : * @param args remaining command-line arguments
1044 : * @param cfgfile name of the configuration file used (for saving, can be NULL!)
1045 : * @param c configuration
1046 : */
1047 : static void
1048 62 : run (void *cls,
1049 : char *const *args,
1050 : const char *cfgfile,
1051 : const struct GNUNET_CONFIGURATION_Handle *c)
1052 : {
1053 : (void) cls;
1054 : (void) args;
1055 : (void) cfgfile;
1056 :
1057 62 : cfg = c;
1058 62 : GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1059 : cls);
1060 62 : if (GNUNET_OK !=
1061 62 : exchange_serve_process_config ())
1062 : {
1063 0 : global_ret = EXIT_NOTCONFIGURED;
1064 0 : GNUNET_SCHEDULER_shutdown ();
1065 0 : return;
1066 : }
1067 62 : ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
1068 : &rc);
1069 62 : if (NULL == ctx)
1070 : {
1071 0 : GNUNET_break (0);
1072 0 : GNUNET_SCHEDULER_shutdown ();
1073 0 : global_ret = EXIT_NO_RESTART;
1074 0 : return;
1075 : }
1076 62 : rc = GNUNET_CURL_gnunet_rc_create (ctx);
1077 62 : schedule_transfers ();
1078 : }
1079 :
1080 :
1081 : /**
1082 : * The main function of taler-exchange-wirewatch
1083 : *
1084 : * @param argc number of arguments from the command line
1085 : * @param argv command line arguments
1086 : * @return 0 ok, non-zero on error
1087 : */
1088 : int
1089 62 : main (int argc,
1090 : char *const *argv)
1091 : {
1092 62 : struct GNUNET_GETOPT_CommandLineOption options[] = {
1093 62 : GNUNET_GETOPT_option_string ('a',
1094 : "account",
1095 : "SECTION_NAME",
1096 : "name of the configuration section with the account we should watch (needed if more than one is enabled for crediting)",
1097 : &account_section),
1098 62 : GNUNET_GETOPT_option_flag ('e',
1099 : "exit-on-error",
1100 : "terminate wirewatch if we failed to download information from the bank",
1101 : &exit_on_error),
1102 62 : GNUNET_GETOPT_option_relative_time ('f',
1103 : "longpoll-timeout",
1104 : "DELAY",
1105 : "what is the timeout when asking the bank about new transactions, specify with unit (e.g. --longpoll-timeout=30s)",
1106 : &longpoll_timeout),
1107 62 : GNUNET_GETOPT_option_flag ('I',
1108 : "ignore-not-found",
1109 : "continue, even if the bank account of the exchange was not found",
1110 : &ignore_account_404),
1111 62 : GNUNET_GETOPT_option_uint ('S',
1112 : "size",
1113 : "SIZE",
1114 : "Size to process per shard (default: 1024)",
1115 : &shard_size),
1116 62 : GNUNET_GETOPT_option_timetravel ('T',
1117 : "timetravel"),
1118 62 : GNUNET_GETOPT_option_flag ('t',
1119 : "test",
1120 : "run in test mode and exit when idle",
1121 : &test_mode),
1122 62 : GNUNET_GETOPT_option_uint ('w',
1123 : "workers",
1124 : "COUNT",
1125 : "Plan work load with up to COUNT worker processes (default: 16)",
1126 : &max_workers),
1127 62 : GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
1128 : GNUNET_GETOPT_OPTION_END
1129 : };
1130 : enum GNUNET_GenericReturnValue ret;
1131 :
1132 62 : longpoll_timeout = LONGPOLL_TIMEOUT;
1133 62 : ret = GNUNET_PROGRAM_run (
1134 : TALER_EXCHANGE_project_data (),
1135 : argc, argv,
1136 : "taler-exchange-wirewatch",
1137 : gettext_noop (
1138 : "background process that watches for incoming wire transfers from customers"),
1139 : options,
1140 : &run, NULL);
1141 62 : if (GNUNET_SYSERR == ret)
1142 0 : return EXIT_INVALIDARGUMENT;
1143 62 : if (GNUNET_NO == ret)
1144 0 : return EXIT_SUCCESS;
1145 62 : return global_ret;
1146 : }
1147 :
1148 :
1149 : /* end of taler-exchange-wirewatch.c */
|