Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2017-2024 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 :
13 : You should have received a copy of the GNU General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file auditor/taler-helper-auditor-transfer.c
18 : * @brief audits that deposits past due date are
19 : * aggregated and have a matching wire transfer
20 : * database.
21 : * @author Christian Grothoff
22 : */
23 : #include "platform.h"
24 : #include <gnunet/gnunet_util_lib.h>
25 : #include <gnunet/gnunet_curl_lib.h>
26 : #include "taler_auditordb_plugin.h"
27 : #include "taler_exchangedb_lib.h"
28 : #include "taler_json_lib.h"
29 : #include "taler_signatures.h"
30 : #include "report-lib.h"
31 : #include "taler_dbevents.h"
32 :
33 :
34 : /**
35 : * Run in test mode. Exit when idle instead of
36 : * going to sleep and waiting for more work.
37 : */
38 : static int test_mode;
39 :
40 : /**
41 : * Return value from main().
42 : */
43 : static int global_ret;
44 :
45 : /**
46 : * Last reserve_out / wire_out serial IDs seen.
47 : */
48 : static TALER_ARL_DEF_PP (wire_batch_deposit_id);
49 : static TALER_ARL_DEF_PP (wire_aggregation_id);
50 :
51 : /**
52 : * Total amount which the exchange did not aggregate/transfer in time.
53 : */
54 : static TALER_ARL_DEF_AB (total_amount_lag);
55 :
56 : /**
57 : * Total amount which the exchange did aggregate/transfer too early.
58 : */
59 : static TALER_ARL_DEF_AB (total_early_aggregation);
60 :
61 : /**
62 : * Should we run checks that only work for exchange-internal audits?
63 : */
64 : static int internal_checks;
65 :
66 : /**
67 : * Database event handler to wake us up again.
68 : */
69 : static struct GNUNET_DB_EventHandler *eh;
70 :
71 : /**
72 : * The auditors's configuration.
73 : */
74 : static const struct GNUNET_CONFIGURATION_Handle *cfg;
75 :
76 :
77 : /**
78 : * Task run on shutdown.
79 : *
80 : * @param cls NULL
81 : */
82 : static void
83 68 : do_shutdown (void *cls)
84 : {
85 : (void) cls;
86 68 : if (NULL != eh)
87 : {
88 0 : TALER_ARL_adb->event_listen_cancel (eh);
89 0 : eh = NULL;
90 : }
91 68 : TALER_ARL_done ();
92 68 : TALER_EXCHANGEDB_unload_accounts ();
93 68 : TALER_ARL_cfg = NULL;
94 68 : }
95 :
96 :
97 : /**
98 : * Closure for import_wire_missing_cb().
99 : */
100 : struct ImportMissingWireContext
101 : {
102 : /**
103 : * Set to maximum row ID encountered.
104 : */
105 : uint64_t max_batch_deposit_uuid;
106 :
107 : /**
108 : * Set to database errors in callback.
109 : */
110 : enum GNUNET_DB_QueryStatus err;
111 : };
112 :
113 :
114 : /**
115 : * Function called on deposits that need to be checked for their
116 : * wire transfer.
117 : *
118 : * @param cls closure, points to a `struct ImportMissingWireContext`
119 : * @param batch_deposit_serial_id serial of the entry in the batch deposits table
120 : * @param total_amount value of the missing deposits, including fee
121 : * @param wire_target_h_payto where should the funds be wired
122 : * @param deadline what was the earliest requested wire transfer deadline
123 : */
124 : static void
125 99 : import_wire_missing_cb (
126 : void *cls,
127 : uint64_t batch_deposit_serial_id,
128 : const struct TALER_Amount *total_amount,
129 : const struct TALER_FullPaytoHashP *wire_target_h_payto,
130 : struct GNUNET_TIME_Timestamp deadline)
131 : {
132 99 : struct ImportMissingWireContext *wc = cls;
133 : enum GNUNET_DB_QueryStatus qs;
134 :
135 99 : if (wc->err < 0)
136 0 : return; /* already failed */
137 99 : GNUNET_assert (batch_deposit_serial_id >= wc->max_batch_deposit_uuid);
138 99 : wc->max_batch_deposit_uuid = batch_deposit_serial_id + 1;
139 99 : qs = TALER_ARL_adb->delete_early_aggregation (
140 99 : TALER_ARL_adb->cls,
141 : batch_deposit_serial_id);
142 99 : switch (qs)
143 : {
144 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
145 0 : GNUNET_break (0);
146 0 : wc->err = qs;
147 0 : return;
148 0 : case GNUNET_DB_STATUS_HARD_ERROR:
149 0 : GNUNET_break (0);
150 0 : wc->err = qs;
151 0 : return;
152 99 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
153 99 : qs = TALER_ARL_adb->insert_pending_deposit (
154 99 : TALER_ARL_adb->cls,
155 : batch_deposit_serial_id,
156 : wire_target_h_payto,
157 : total_amount,
158 : deadline);
159 99 : if (0 > qs)
160 : {
161 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
162 0 : wc->err = qs;
163 : }
164 99 : TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_amount_lag),
165 : &TALER_ARL_USE_AB (total_amount_lag),
166 : total_amount);
167 99 : break;
168 0 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
169 0 : TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (total_early_aggregation),
170 : &TALER_ARL_USE_AB (total_early_aggregation),
171 : total_amount);
172 0 : break;
173 0 : default:
174 0 : GNUNET_assert (0);
175 : }
176 : }
177 :
178 :
179 : /**
180 : * Checks for wire transfers that should have happened.
181 : *
182 : * @return transaction status
183 : */
184 : static enum GNUNET_DB_QueryStatus
185 68 : check_for_required_transfers (void)
186 : {
187 : enum GNUNET_DB_QueryStatus qs;
188 68 : struct ImportMissingWireContext wc = {
189 : .max_batch_deposit_uuid = TALER_ARL_USE_PP (wire_batch_deposit_id),
190 : .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
191 : };
192 :
193 68 : qs = TALER_ARL_edb->select_batch_deposits_missing_wire (
194 68 : TALER_ARL_edb->cls,
195 : TALER_ARL_USE_PP (wire_batch_deposit_id),
196 : &import_wire_missing_cb,
197 : &wc);
198 68 : if (0 > qs)
199 : {
200 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
201 0 : return qs;
202 : }
203 68 : if (0 > wc.err)
204 : {
205 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == wc.err);
206 0 : return wc.err;
207 : }
208 68 : TALER_ARL_USE_PP (wire_batch_deposit_id) = wc.max_batch_deposit_uuid;
209 68 : return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
210 : }
211 :
212 :
213 : /**
214 : * Closure for #clear_finished_transfer_cb().
215 : */
216 : struct AggregationContext
217 : {
218 : /**
219 : * Set to maximum row ID encountered.
220 : */
221 : uint64_t max_aggregation_serial;
222 :
223 : /**
224 : * Set to database errors in callback.
225 : */
226 : enum GNUNET_DB_QueryStatus err;
227 : };
228 :
229 :
230 : /**
231 : * Function called on aggregations that were done for
232 : * a (batch) deposit.
233 : *
234 : * @param cls closure
235 : * @param amount affected amount
236 : * @param tracking_serial_id where in the table are we
237 : * @param batch_deposit_serial_id which batch deposit was aggregated
238 : */
239 : static void
240 42 : clear_finished_transfer_cb (
241 : void *cls,
242 : const struct TALER_Amount *amount,
243 : uint64_t tracking_serial_id,
244 : uint64_t batch_deposit_serial_id)
245 : {
246 42 : struct AggregationContext *ac = cls;
247 : enum GNUNET_DB_QueryStatus qs;
248 :
249 42 : if (0 > ac->err)
250 0 : return; /* already failed */
251 42 : GNUNET_assert (ac->max_aggregation_serial <= tracking_serial_id);
252 42 : ac->max_aggregation_serial = tracking_serial_id + 1;
253 42 : qs = TALER_ARL_adb->delete_pending_deposit (
254 42 : TALER_ARL_adb->cls,
255 : batch_deposit_serial_id);
256 42 : switch (qs)
257 : {
258 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
259 0 : GNUNET_break (0);
260 0 : ac->err = qs;
261 0 : return;
262 0 : case GNUNET_DB_STATUS_HARD_ERROR:
263 0 : GNUNET_break (0);
264 0 : ac->err = qs;
265 0 : return;
266 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
267 0 : qs = TALER_ARL_adb->insert_early_aggregation (
268 0 : TALER_ARL_adb->cls,
269 : batch_deposit_serial_id,
270 : tracking_serial_id,
271 : amount);
272 0 : if (0 > qs)
273 : {
274 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
275 0 : ac->err = qs;
276 0 : return;
277 : }
278 0 : TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_early_aggregation),
279 : &TALER_ARL_USE_AB (total_early_aggregation),
280 : amount);
281 0 : break;
282 42 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
283 42 : TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (total_amount_lag),
284 : &TALER_ARL_USE_AB (total_amount_lag),
285 : amount);
286 42 : break;
287 0 : default:
288 0 : GNUNET_assert (0);
289 : }
290 : }
291 :
292 :
293 : /**
294 : * Checks that all wire transfers that should have happened
295 : * (based on deposits) have indeed happened.
296 : *
297 : * @return transaction status
298 : */
299 : static enum GNUNET_DB_QueryStatus
300 68 : check_for_completed_transfers (void)
301 : {
302 68 : struct AggregationContext ac = {
303 : .max_aggregation_serial = TALER_ARL_USE_PP (wire_aggregation_id),
304 : .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
305 : };
306 : enum GNUNET_DB_QueryStatus qs;
307 :
308 68 : qs = TALER_ARL_edb->select_aggregations_above_serial (
309 68 : TALER_ARL_edb->cls,
310 : TALER_ARL_USE_PP (wire_aggregation_id),
311 : &clear_finished_transfer_cb,
312 : &ac);
313 68 : if (0 > qs)
314 : {
315 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
316 0 : return qs;
317 : }
318 68 : if (0 > ac.err)
319 : {
320 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == ac.err);
321 0 : return ac.err;
322 : }
323 68 : TALER_ARL_USE_PP (wire_aggregation_id) = ac.max_aggregation_serial;
324 68 : return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
325 : }
326 :
327 :
328 : /**
329 : * Start the database transactions and begin the audit.
330 : *
331 : * @return transaction status
332 : */
333 : static enum GNUNET_DB_QueryStatus
334 68 : begin_transaction (void)
335 : {
336 : enum GNUNET_DB_QueryStatus qs;
337 :
338 68 : if (GNUNET_SYSERR ==
339 68 : TALER_ARL_edb->preflight (TALER_ARL_edb->cls))
340 : {
341 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
342 : "Failed to initialize exchange database connection.\n");
343 0 : return GNUNET_DB_STATUS_HARD_ERROR;
344 : }
345 68 : if (GNUNET_SYSERR ==
346 68 : TALER_ARL_adb->preflight (TALER_ARL_adb->cls))
347 : {
348 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
349 : "Failed to initialize auditor database session.\n");
350 0 : return GNUNET_DB_STATUS_HARD_ERROR;
351 : }
352 68 : if (GNUNET_OK !=
353 68 : TALER_ARL_adb->start (TALER_ARL_adb->cls))
354 : {
355 0 : GNUNET_break (0);
356 0 : return GNUNET_DB_STATUS_HARD_ERROR;
357 : }
358 68 : if (GNUNET_OK !=
359 68 : TALER_ARL_edb->start_read_only (TALER_ARL_edb->cls,
360 : "transfer auditor"))
361 : {
362 0 : GNUNET_break (0);
363 0 : TALER_ARL_adb->rollback (TALER_ARL_adb->cls);
364 0 : return GNUNET_DB_STATUS_HARD_ERROR;
365 : }
366 68 : qs = TALER_ARL_adb->get_auditor_progress (
367 68 : TALER_ARL_adb->cls,
368 : TALER_ARL_GET_PP (wire_batch_deposit_id),
369 : TALER_ARL_GET_PP (wire_aggregation_id),
370 : NULL);
371 68 : if (0 > qs)
372 0 : goto handle_db_error;
373 :
374 68 : qs = TALER_ARL_adb->get_balance (
375 68 : TALER_ARL_adb->cls,
376 : TALER_ARL_GET_AB (total_amount_lag),
377 : TALER_ARL_GET_AB (total_early_aggregation),
378 : NULL);
379 68 : if (0 > qs)
380 0 : goto handle_db_error;
381 68 : if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
382 : {
383 0 : GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
384 : "First analysis of with transfer auditor, starting audit from scratch\n");
385 : }
386 : else
387 : {
388 68 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
389 : "Resuming transfer audit at %llu / %llu\n",
390 : (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id),
391 : (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id));
392 : }
393 :
394 68 : qs = check_for_required_transfers ();
395 68 : if (0 > qs)
396 0 : goto handle_db_error;
397 68 : qs = check_for_completed_transfers ();
398 68 : if (0 > qs)
399 0 : goto handle_db_error;
400 :
401 68 : qs = TALER_ARL_adb->update_auditor_progress (
402 68 : TALER_ARL_adb->cls,
403 : TALER_ARL_SET_PP (wire_batch_deposit_id),
404 : TALER_ARL_SET_PP (wire_aggregation_id),
405 : NULL);
406 68 : if (0 > qs)
407 0 : goto handle_db_error;
408 68 : qs = TALER_ARL_adb->insert_auditor_progress (
409 68 : TALER_ARL_adb->cls,
410 : TALER_ARL_SET_PP (wire_batch_deposit_id),
411 : TALER_ARL_SET_PP (wire_aggregation_id),
412 : NULL);
413 68 : if (0 > qs)
414 0 : goto handle_db_error;
415 68 : qs = TALER_ARL_adb->update_balance (
416 68 : TALER_ARL_adb->cls,
417 : TALER_ARL_SET_AB (total_amount_lag),
418 : TALER_ARL_SET_AB (total_early_aggregation),
419 : NULL);
420 68 : if (0 > qs)
421 0 : goto handle_db_error;
422 68 : qs = TALER_ARL_adb->insert_balance (
423 68 : TALER_ARL_adb->cls,
424 : TALER_ARL_SET_AB (total_amount_lag),
425 : TALER_ARL_SET_AB (total_early_aggregation),
426 : NULL);
427 68 : if (0 > qs)
428 0 : goto handle_db_error;
429 68 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
430 : "Concluded audit step at %llu/%llu\n",
431 : (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id),
432 : (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id));
433 68 : TALER_ARL_edb->rollback (TALER_ARL_edb->cls);
434 68 : qs = TALER_ARL_adb->commit (TALER_ARL_adb->cls);
435 68 : if (0 > qs)
436 0 : goto handle_db_error;
437 68 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
438 : "Transaction concluded!\n");
439 68 : return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
440 0 : handle_db_error:
441 0 : TALER_ARL_adb->rollback (TALER_ARL_adb->cls);
442 0 : TALER_ARL_edb->rollback (TALER_ARL_edb->cls);
443 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
444 0 : return qs;
445 : }
446 :
447 :
448 : /**
449 : * Start auditor process.
450 : */
451 : static void
452 68 : start (void)
453 : {
454 : enum GNUNET_DB_QueryStatus qs;
455 :
456 68 : for (unsigned int max_retries = 3; max_retries>0; max_retries--)
457 : {
458 68 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
459 : "Trying again (%u attempts left)\n",
460 : max_retries);
461 68 : qs = begin_transaction ();
462 68 : if (GNUNET_DB_STATUS_SOFT_ERROR != qs)
463 68 : break;
464 : }
465 68 : if (0 > qs)
466 : {
467 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
468 : "Audit failed\n");
469 0 : GNUNET_break (0);
470 0 : global_ret = EXIT_FAILURE;
471 0 : GNUNET_SCHEDULER_shutdown ();
472 0 : return;
473 : }
474 : }
475 :
476 :
477 : /**
478 : * Function called on events received from Postgres.
479 : *
480 : * @param cls closure, NULL
481 : * @param extra additional event data provided
482 : * @param extra_size number of bytes in @a extra
483 : */
484 : static void
485 0 : db_notify (void *cls,
486 : const void *extra,
487 : size_t extra_size)
488 : {
489 : (void) cls;
490 : (void) extra;
491 : (void) extra_size;
492 :
493 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
494 : "Received notification to wake transfer helper\n");
495 0 : start ();
496 0 : }
497 :
498 :
499 : /**
500 : * Main function that will be run.
501 : *
502 : * @param cls closure
503 : * @param args remaining command-line arguments
504 : * @param cfgfile name of the configuration file used (for saving, can be NULL!)
505 : * @param c configuration
506 : */
507 : static void
508 68 : run (void *cls,
509 : char *const *args,
510 : const char *cfgfile,
511 : const struct GNUNET_CONFIGURATION_Handle *c)
512 : {
513 : (void) cls;
514 : (void) args;
515 : (void) cfgfile;
516 68 : cfg = c;
517 68 : if (GNUNET_OK !=
518 68 : TALER_ARL_init (c))
519 : {
520 0 : global_ret = EXIT_FAILURE;
521 0 : return;
522 : }
523 68 : GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
524 : NULL);
525 68 : if (GNUNET_OK !=
526 68 : TALER_EXCHANGEDB_load_accounts (TALER_ARL_cfg,
527 : TALER_EXCHANGEDB_ALO_DEBIT
528 : | TALER_EXCHANGEDB_ALO_CREDIT
529 : | TALER_EXCHANGEDB_ALO_AUTHDATA))
530 : {
531 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
532 : "No bank accounts configured\n");
533 0 : global_ret = EXIT_NOTCONFIGURED;
534 0 : GNUNET_SCHEDULER_shutdown ();
535 0 : return;
536 : }
537 68 : if (0 == test_mode)
538 : {
539 : // FIXME-Optimization: use different event type in the future!
540 0 : struct GNUNET_DB_EventHeaderP es = {
541 0 : .size = htons (sizeof (es)),
542 0 : .type = htons (TALER_DBEVENT_EXCHANGE_AUDITOR_WAKE_HELPER_WIRE)
543 : };
544 :
545 0 : eh = TALER_ARL_adb->event_listen (TALER_ARL_adb->cls,
546 : &es,
547 0 : GNUNET_TIME_UNIT_FOREVER_REL,
548 : &db_notify,
549 : NULL);
550 0 : GNUNET_assert (NULL != eh);
551 : }
552 68 : start ();
553 : }
554 :
555 :
556 : /**
557 : * The main function of the wire auditing tool. Checks that
558 : * the exchange's records of wire transfers match that of
559 : * the wire gateway.
560 : *
561 : * @param argc number of arguments from the command line
562 : * @param argv command line arguments
563 : * @return 0 ok, 1 on error
564 : */
565 : int
566 68 : main (int argc,
567 : char *const *argv)
568 : {
569 68 : const struct GNUNET_GETOPT_CommandLineOption options[] = {
570 68 : GNUNET_GETOPT_option_flag ('i',
571 : "internal",
572 : "perform checks only applicable for exchange-internal audits",
573 : &internal_checks),
574 68 : GNUNET_GETOPT_option_flag ('t',
575 : "test",
576 : "run in test mode and exit when idle",
577 : &test_mode),
578 68 : GNUNET_GETOPT_option_timetravel ('T',
579 : "timetravel"),
580 : GNUNET_GETOPT_OPTION_END
581 : };
582 : enum GNUNET_GenericReturnValue ret;
583 :
584 68 : ret = GNUNET_PROGRAM_run (
585 : TALER_AUDITOR_project_data (),
586 : argc,
587 : argv,
588 : "taler-helper-auditor-transfer",
589 : gettext_noop (
590 : "Audit exchange database for consistency of aggregations/transfers with respect to deposit deadlines"),
591 : options,
592 : &run,
593 : NULL);
594 68 : if (GNUNET_SYSERR == ret)
595 0 : return EXIT_INVALIDARGUMENT;
596 68 : if (GNUNET_NO == ret)
597 0 : return EXIT_SUCCESS;
598 68 : return global_ret;
599 : }
600 :
601 :
602 : /* end of taler-helper-auditor-transfer.c */
|