Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2016-2025 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU Affero General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
12 :
13 : You should have received a copy of the GNU Affero General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 :
17 : /**
18 : * @file taler-exchange-aggregator.c
19 : * @brief Process that aggregates outgoing transactions and prepares their execution
20 : * @author Christian Grothoff
21 : */
22 : #include "taler/platform.h"
23 : #include <gnunet/gnunet_util_lib.h>
24 : #include <jansson.h>
25 : #include <pthread.h>
26 : #include "taler/taler_exchangedb_lib.h"
27 : #include "taler/taler_exchangedb_plugin.h"
28 : #include "taler/taler_json_lib.h"
29 : #include "taler/taler_kyclogic_lib.h"
30 : #include "taler/taler_bank_service.h"
31 : #include "taler/taler_dbevents.h"
32 :
33 : /**
34 : * How often do we retry after serialization failures?
35 : */
36 : #define MAX_RETRIES 5
37 :
38 : /**
39 : * Information about one aggregation process to be executed. There is
40 : * at most one of these around at any given point in time.
41 : * Note that this limits parallelism, and we might want
42 : * to revise this decision at a later point.
43 : */
44 : struct AggregationUnit
45 : {
46 : /**
47 : * Public key of the merchant.
48 : */
49 : struct TALER_MerchantPublicKeyP merchant_pub;
50 :
51 : /**
52 : * Transient amount already found aggregated,
53 : * set only if @e have_transient is true.
54 : */
55 : struct TALER_Amount trans;
56 :
57 : /**
58 : * Total amount to be transferred, before subtraction of @e fees.wire and rounding down.
59 : */
60 : struct TALER_Amount total_amount;
61 :
62 : /**
63 : * Final amount to be transferred (after fee and rounding down).
64 : */
65 : struct TALER_Amount final_amount;
66 :
67 : /**
68 : * Wire fee we charge for @e wp at @e execution_time.
69 : */
70 : struct TALER_WireFeeSet fees;
71 :
72 : /**
73 : * Wire transfer identifier we use.
74 : */
75 : struct TALER_WireTransferIdentifierRawP wtid;
76 :
77 : /**
78 : * The current time (which triggered the aggregation and
79 : * defines the wire fee).
80 : */
81 : struct GNUNET_TIME_Timestamp execution_time;
82 :
83 : /**
84 : * Wire details of the merchant.
85 : */
86 : struct TALER_FullPayto payto_uri;
87 :
88 : /**
89 : * Selected wire target for the aggregation.
90 : */
91 : struct TALER_FullPaytoHashP h_full_payto;
92 :
93 : /**
94 : * Selected wire target for KYC checks.
95 : */
96 : struct TALER_NormalizedPaytoHashP h_normalized_payto;
97 :
98 : /**
99 : * Exchange wire account to be used for the preparation and
100 : * eventual execution of the aggregate wire transfer.
101 : */
102 : const struct TALER_EXCHANGEDB_AccountInfo *wa;
103 :
104 : /**
105 : * Shard this aggregation unit is part of.
106 : */
107 : struct Shard *shard;
108 :
109 : /**
110 : * Handle to async process to obtain the legitimization rules.
111 : */
112 : struct TALER_EXCHANGEDB_RuleUpdater *ru;
113 :
114 : /**
115 : * Row in KYC table for legitimization requirements
116 : * that are pending for this aggregation, or 0 if none.
117 : */
118 : uint64_t requirement_row;
119 :
120 : /**
121 : * How often did we retry the transaction?
122 : */
123 : unsigned int retries;
124 :
125 : /**
126 : * Should we run a follow-up transaction with a legitimization
127 : * check?
128 : */
129 : bool legi_check;
130 :
131 : /**
132 : * Do we have an entry in the transient table for
133 : * this aggregation?
134 : */
135 : bool have_transient;
136 :
137 : /**
138 : * Is the wrong merchant public key associated with
139 : * the KYC data?
140 : */
141 : bool bad_kyc_auth;
142 :
143 : };
144 :
145 :
146 : /**
147 : * Work shard we are processing.
148 : */
149 : struct Shard
150 : {
151 :
152 : /**
153 : * When did we start processing the shard?
154 : */
155 : struct GNUNET_TIME_Timestamp start_time;
156 :
157 : /**
158 : * Starting row of the shard.
159 : */
160 : uint32_t shard_start;
161 :
162 : /**
163 : * Inclusive end row of the shard.
164 : */
165 : uint32_t shard_end;
166 :
167 : /**
168 : * Number of starting points found in the shard.
169 : */
170 : uint64_t work_counter;
171 :
172 : };
173 :
174 :
175 : /**
176 : * What is the smallest unit we support for wire transfers?
177 : * We will need to round down to a multiple of this amount.
178 : */
179 : static struct TALER_Amount currency_round_unit;
180 :
181 : /**
182 : * What is the base URL of this exchange? Used in the
183 : * wire transfer subjects so that merchants and governments
184 : * can ask for the list of aggregated deposits.
185 : */
186 : static char *exchange_base_url;
187 :
188 : /**
189 : * Set to #GNUNET_YES if this exchange does not support KYC checks
190 : * and thus deposits are to be aggregated regardless of the
191 : * KYC status of the target account.
192 : */
193 : static int kyc_off;
194 :
195 : /**
196 : * The exchange's configuration.
197 : */
198 : static const struct GNUNET_CONFIGURATION_Handle *cfg;
199 :
200 : /**
201 : * Key used to encrypt KYC attribute data in our database.
202 : */
203 : static struct TALER_AttributeEncryptionKeyP attribute_key;
204 :
205 : /**
206 : * Our database plugin.
207 : */
208 : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
209 :
210 : /**
211 : * Next task to run, if any.
212 : */
213 : static struct GNUNET_SCHEDULER_Task *task;
214 :
215 : /**
216 : * How long should we sleep when idle before trying to find more work?
217 : */
218 : static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
219 :
220 : /**
221 : * How big are the shards we are processing? Is an inclusive offset, so every
222 : * shard ranges from [X,X+shard_size) exclusive. So a shard covers
223 : * shard_size slots. The maximum value for shard_size is INT32_MAX+1.
224 : */
225 : static uint32_t shard_size;
226 :
227 : /**
228 : * Value to return from main(). 0 on success, non-zero on errors.
229 : */
230 : static int global_ret;
231 :
232 : /**
233 : * #GNUNET_YES if we are in test mode and should exit when idle.
234 : */
235 : static int test_mode;
236 :
237 :
238 : /**
239 : * Main work function that queries the DB and aggregates transactions
240 : * into larger wire transfers.
241 : *
242 : * @param cls a `struct Shard *`
243 : */
244 : static void
245 : run_aggregation (void *cls);
246 :
247 :
248 : /**
249 : * Work on transactions unlocked by KYC.
250 : *
251 : * @param cls NULL
252 : */
253 : static void
254 : drain_kyc_alerts (void *cls);
255 :
256 :
257 : /**
258 : * Free data stored in @a au, including @a au itself.
259 : *
260 : * @param[in] au aggregation unit to clean up
261 : */
262 : static void
263 122 : cleanup_au (struct AggregationUnit *au)
264 : {
265 122 : GNUNET_assert (NULL != au);
266 122 : if (NULL != au->ru)
267 : {
268 0 : GNUNET_break (0);
269 0 : TALER_EXCHANGEDB_update_rules_cancel (au->ru);
270 0 : au->ru = NULL;
271 : }
272 122 : GNUNET_free (au->payto_uri.full_payto);
273 122 : GNUNET_free (au);
274 122 : }
275 :
276 :
277 : /**
278 : * Perform a database commit. If it fails, print a warning.
279 : *
280 : * @return status of commit
281 : */
282 : static enum GNUNET_DB_QueryStatus
283 70 : commit_or_warn (void)
284 : {
285 : enum GNUNET_DB_QueryStatus qs;
286 :
287 70 : qs = db_plugin->commit (db_plugin->cls);
288 70 : if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
289 70 : return qs;
290 0 : GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
291 : ? GNUNET_ERROR_TYPE_INFO
292 : : GNUNET_ERROR_TYPE_ERROR,
293 : "Failed to commit database transaction!\n");
294 0 : return qs;
295 : }
296 :
297 :
298 : /**
299 : * Release lock on shard @a s in the database.
300 : * On error, terminates this process.
301 : *
302 : * @param[in] s shard to free (and memory to release)
303 : */
304 : static void
305 121 : release_shard (struct Shard *s)
306 : {
307 : enum GNUNET_DB_QueryStatus qs;
308 :
309 121 : qs = db_plugin->release_revolving_shard (
310 121 : db_plugin->cls,
311 : "aggregator",
312 : s->shard_start,
313 : s->shard_end);
314 121 : GNUNET_free (s);
315 121 : switch (qs)
316 : {
317 0 : case GNUNET_DB_STATUS_HARD_ERROR:
318 : case GNUNET_DB_STATUS_SOFT_ERROR:
319 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
320 0 : GNUNET_break (0);
321 0 : global_ret = EXIT_FAILURE;
322 0 : GNUNET_SCHEDULER_shutdown ();
323 0 : return;
324 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
325 : /* Strange, but let's just continue */
326 0 : break;
327 121 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
328 : /* normal case */
329 121 : break;
330 : }
331 : }
332 :
333 :
334 : /**
335 : * Schedule the next major task, or exit depending on mode.
336 : */
337 : static void
338 122 : next_task (uint64_t counter)
339 : {
340 122 : if ( (GNUNET_YES == test_mode) &&
341 : (0 == counter) )
342 : {
343 : /* in test mode, shutdown after a shard is done with 0 work */
344 55 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
345 : "No work done and in test mode, shutting down\n");
346 55 : GNUNET_SCHEDULER_shutdown ();
347 55 : return;
348 : }
349 67 : GNUNET_assert (NULL == task);
350 : /* If we ended up doing zero work, sleep a bit */
351 67 : if (0 == counter)
352 : {
353 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
354 : "Going to sleep for %s before trying again\n",
355 : GNUNET_TIME_relative2s (aggregator_idle_sleep_interval,
356 : true));
357 0 : task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
358 : &drain_kyc_alerts,
359 : NULL);
360 : }
361 : else
362 : {
363 67 : task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
364 : NULL);
365 : }
366 : }
367 :
368 :
369 : /**
370 : * Rollback the current transaction (if any),
371 : * then free data stored in @a au, including @a au itself, and then
372 : * run the next aggregation task.
373 : *
374 : * @param[in] au aggregation unit to clean up
375 : */
376 : static void
377 122 : cleanup_and_next (struct AggregationUnit *au)
378 : {
379 122 : struct Shard *s = au->shard;
380 122 : uint64_t counter = (NULL == s) ? 0 : s->work_counter;
381 :
382 : /* just in case, often no transaction is running here anymore */
383 122 : db_plugin->rollback (db_plugin->cls);
384 122 : cleanup_au (au);
385 122 : if (NULL != s)
386 121 : release_shard (s);
387 122 : if (EXIT_SUCCESS == global_ret)
388 122 : next_task (counter);
389 122 : }
390 :
391 :
392 : /**
393 : * We're being aborted with CTRL-C (or SIGTERM). Shut down.
394 : *
395 : * @param cls closure
396 : */
397 : static void
398 55 : shutdown_task (void *cls)
399 : {
400 : (void) cls;
401 55 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
402 : "Running shutdown\n");
403 55 : if (NULL != task)
404 : {
405 0 : GNUNET_SCHEDULER_cancel (task);
406 0 : task = NULL;
407 : }
408 55 : TALER_KYCLOGIC_kyc_done ();
409 55 : TALER_EXCHANGEDB_plugin_unload (db_plugin);
410 55 : db_plugin = NULL;
411 55 : TALER_EXCHANGEDB_unload_accounts ();
412 55 : cfg = NULL;
413 55 : }
414 :
415 :
416 : /**
417 : * Parse the configuration for aggregator.
418 : *
419 : * @return #GNUNET_OK on success
420 : */
421 : static enum GNUNET_GenericReturnValue
422 55 : parse_aggregator_config (void)
423 : {
424 : enum GNUNET_GenericReturnValue enable_kyc;
425 :
426 : enable_kyc
427 55 : = GNUNET_CONFIGURATION_get_value_yesno (
428 : cfg,
429 : "exchange",
430 : "ENABLE_KYC");
431 55 : if (GNUNET_SYSERR == enable_kyc)
432 : {
433 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
434 : "Need YES or NO in section `exchange' under `ENABLE_KYC'\n");
435 0 : return GNUNET_SYSERR;
436 : }
437 55 : if (GNUNET_NO == enable_kyc)
438 : {
439 41 : kyc_off = true;
440 : }
441 : else
442 : {
443 : char *attr_enc_key_str;
444 :
445 14 : if (GNUNET_OK !=
446 14 : GNUNET_CONFIGURATION_get_value_string (cfg,
447 : "exchange",
448 : "ATTRIBUTE_ENCRYPTION_KEY",
449 : &attr_enc_key_str))
450 : {
451 0 : GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
452 : "exchange",
453 : "ATTRIBUTE_ENCRYPTION_KEY");
454 0 : return GNUNET_SYSERR;
455 : }
456 14 : GNUNET_CRYPTO_hash (attr_enc_key_str,
457 : strlen (attr_enc_key_str),
458 : &attribute_key.hash);
459 14 : GNUNET_free (attr_enc_key_str);
460 : }
461 55 : if (GNUNET_OK !=
462 55 : GNUNET_CONFIGURATION_get_value_string (cfg,
463 : "exchange",
464 : "BASE_URL",
465 : &exchange_base_url))
466 : {
467 0 : GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
468 : "exchange",
469 : "BASE_URL");
470 0 : return GNUNET_SYSERR;
471 : }
472 55 : if (GNUNET_OK !=
473 55 : GNUNET_CONFIGURATION_get_value_time (cfg,
474 : "exchange",
475 : "AGGREGATOR_IDLE_SLEEP_INTERVAL",
476 : &aggregator_idle_sleep_interval))
477 : {
478 0 : GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
479 : "exchange",
480 : "AGGREGATOR_IDLE_SLEEP_INTERVAL");
481 0 : return GNUNET_SYSERR;
482 : }
483 55 : if ( (GNUNET_OK !=
484 55 : TALER_config_get_amount (cfg,
485 : "exchange",
486 : "CURRENCY_ROUND_UNIT",
487 55 : ¤cy_round_unit)) ||
488 55 : (TALER_amount_is_zero (¤cy_round_unit)) )
489 : {
490 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
491 : "Need non-zero amount in section `exchange' under `CURRENCY_ROUND_UNIT'\n");
492 0 : return GNUNET_SYSERR;
493 : }
494 :
495 55 : if (NULL ==
496 55 : (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg,
497 : false)))
498 : {
499 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
500 : "Failed to initialize DB subsystem\n");
501 0 : return GNUNET_SYSERR;
502 : }
503 55 : if (GNUNET_OK !=
504 55 : TALER_EXCHANGEDB_load_accounts (cfg,
505 : TALER_EXCHANGEDB_ALO_DEBIT))
506 : {
507 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
508 : "No wire accounts configured for debit!\n");
509 0 : TALER_EXCHANGEDB_plugin_unload (db_plugin);
510 0 : db_plugin = NULL;
511 0 : return GNUNET_SYSERR;
512 : }
513 55 : return GNUNET_OK;
514 : }
515 :
516 :
517 : /**
518 : * Callback to return all applicable amounts for the KYC
519 : * decision to @ a cb.
520 : *
521 : * @param cls a `struct AggregationUnit *`
522 : * @param limit time limit for the iteration
523 : * @param cb function to call with the amounts
524 : * @param cb_cls closure for @a cb
525 : * @return transaction status
526 : */
527 : static enum GNUNET_DB_QueryStatus
528 2 : return_relevant_amounts (void *cls,
529 : struct GNUNET_TIME_Absolute limit,
530 : TALER_EXCHANGEDB_KycAmountCallback cb,
531 : void *cb_cls)
532 : {
533 2 : const struct AggregationUnit *au = cls;
534 : enum GNUNET_DB_QueryStatus qs;
535 :
536 2 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
537 : "Returning amount %s in KYC check\n",
538 : TALER_amount2s (&au->total_amount));
539 2 : if (GNUNET_OK !=
540 2 : cb (cb_cls,
541 : &au->total_amount,
542 : GNUNET_TIME_absolute_get ()))
543 0 : return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
544 2 : qs = db_plugin->select_aggregation_amounts_for_kyc_check (
545 2 : db_plugin->cls,
546 : &au->h_normalized_payto,
547 : limit,
548 : cb,
549 : cb_cls);
550 2 : if (GNUNET_DB_STATUS_HARD_ERROR == qs)
551 : {
552 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
553 : "Failed to select aggregation amounts for KYC limit check!\n");
554 : }
555 2 : return qs;
556 : }
557 :
558 :
559 : /**
560 : * The aggregation process failed hard, shut down the program.
561 : *
562 : * @param[in] au aggregation that failed hard
563 : */
564 : static void
565 0 : fail_aggregation (struct AggregationUnit *au)
566 : {
567 0 : struct Shard *s = au->shard;
568 :
569 0 : cleanup_au (au);
570 0 : global_ret = EXIT_FAILURE;
571 0 : GNUNET_SCHEDULER_shutdown ();
572 0 : db_plugin->rollback (db_plugin->cls);
573 0 : release_shard (s);
574 0 : }
575 :
576 :
577 : /**
578 : * Run the next task with the given shard @a s.
579 : *
580 : * @param s shard to run, NULL to run more drain jobs
581 : */
582 : static void
583 0 : run_task_with_shard (struct Shard *s)
584 : {
585 0 : GNUNET_assert (NULL == task);
586 0 : if (NULL == s)
587 0 : task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
588 : NULL);
589 : else
590 0 : task = GNUNET_SCHEDULER_add_now (&run_aggregation,
591 : s);
592 0 : }
593 :
594 :
595 : /**
596 : * The aggregation process failed with a serialization
597 : * issue. Rollback the transaction and try again.
598 : *
599 : * @param[in] au aggregation that needs to be rolled back
600 : */
601 : static void
602 0 : rollback_aggregation (struct AggregationUnit *au)
603 : {
604 0 : struct Shard *s = au->shard;
605 :
606 0 : cleanup_au (au);
607 0 : db_plugin->rollback (db_plugin->cls);
608 0 : run_task_with_shard (s);
609 0 : }
610 :
611 :
612 : /**
613 : * Function called with legitimization rule set. Check
614 : * how that affects the aggregation process.
615 : *
616 : * @param[in] cls a `struct AggregationUnit *`
617 : * @param[in] rur new legitimization rule set to evaluate
618 : */
619 : static void
620 : evaluate_rules (
621 : void *cls,
622 : struct TALER_EXCHANGEDB_RuleUpdaterResult *rur);
623 :
624 :
625 : /**
626 : * The aggregation process succeeded and should be finally committed.
627 : *
628 : * @param[in] au aggregation that needs to be committed
629 : */
630 : static void
631 69 : commit_aggregation (struct AggregationUnit *au)
632 : {
633 69 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
634 : "Committing aggregation result over %s to %s\n",
635 : TALER_amount2s (&au->final_amount),
636 : au->payto_uri.full_payto);
637 : /* Now we can finally commit the overall transaction, as we are
638 : again consistent if all of this passes. */
639 69 : switch (commit_or_warn ())
640 : {
641 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
642 : /* try again */
643 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
644 : "Serialization issue on commit; trying again later!\n");
645 0 : cleanup_and_next (au);
646 0 : return;
647 0 : case GNUNET_DB_STATUS_HARD_ERROR:
648 0 : GNUNET_break (0);
649 0 : global_ret = EXIT_FAILURE;
650 0 : GNUNET_SCHEDULER_shutdown ();
651 0 : cleanup_and_next (au);
652 0 : return;
653 69 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
654 69 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
655 : "Commit complete, going again\n");
656 69 : if (au->legi_check)
657 : {
658 2 : au->legi_check = false;
659 4 : au->ru = TALER_EXCHANGEDB_update_rules (
660 : db_plugin,
661 : &attribute_key,
662 2 : &au->h_normalized_payto,
663 : false, /* aggregation doesn't apply to wallets */
664 : &evaluate_rules,
665 : au);
666 2 : if (NULL != au->ru)
667 2 : return;
668 : }
669 67 : cleanup_and_next (au);
670 67 : return;
671 0 : default:
672 0 : GNUNET_break (0);
673 0 : global_ret = EXIT_FAILURE;
674 0 : GNUNET_SCHEDULER_shutdown ();
675 0 : cleanup_and_next (au);
676 0 : return;
677 : }
678 : }
679 :
680 :
681 : /**
682 : * Trigger the wire transfer for the @a au
683 : * and delete the record of the aggregation.
684 : *
685 : * @param[in] au information about the aggregation
686 : */
687 : static void
688 61 : trigger_wire_transfer (struct AggregationUnit *au)
689 : {
690 : enum GNUNET_DB_QueryStatus qs;
691 :
692 61 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
693 : "Preparing wire transfer of %s to %s\n",
694 : TALER_amount2s (&au->final_amount),
695 : TALER_B2S (&au->merchant_pub));
696 : {
697 : void *buf;
698 : size_t buf_size;
699 :
700 61 : TALER_BANK_prepare_transfer (au->payto_uri,
701 61 : &au->final_amount,
702 : exchange_base_url,
703 61 : &au->wtid,
704 : &buf,
705 : &buf_size);
706 61 : GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707 : "Storing %u bytes of wire prepare data\n",
708 : (unsigned int) buf_size);
709 : /* Commit our intention to execute the wire transfer! */
710 61 : qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
711 61 : au->wa->method,
712 : buf,
713 : buf_size);
714 61 : GNUNET_log (qs >= 0
715 : ? GNUNET_ERROR_TYPE_DEBUG
716 : : GNUNET_ERROR_TYPE_WARNING,
717 : "wire_prepare_data_insert returned %d\n",
718 : (int) qs);
719 61 : GNUNET_free (buf);
720 : }
721 : /* Commit the WTID data to 'wire_out' */
722 61 : if (qs >= 0)
723 : {
724 61 : qs = db_plugin->store_wire_transfer_out (
725 61 : db_plugin->cls,
726 : au->execution_time,
727 61 : &au->wtid,
728 61 : &au->h_full_payto,
729 61 : au->wa->section_name,
730 61 : &au->final_amount);
731 61 : GNUNET_log (qs >= 0
732 : ? GNUNET_ERROR_TYPE_DEBUG
733 : : GNUNET_ERROR_TYPE_WARNING,
734 : "store_wire_transfer_out returned %d\n",
735 : (int) qs);
736 : }
737 61 : if ( (qs >= 0) &&
738 61 : au->have_transient)
739 3 : qs = db_plugin->delete_aggregation_transient (
740 3 : db_plugin->cls,
741 3 : &au->h_full_payto,
742 3 : &au->wtid);
743 :
744 61 : switch (qs)
745 : {
746 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
747 0 : GNUNET_log (
748 : GNUNET_ERROR_TYPE_INFO,
749 : "Serialization issue during aggregation; trying again later!\n");
750 0 : rollback_aggregation (au);
751 0 : return;
752 0 : case GNUNET_DB_STATUS_HARD_ERROR:
753 0 : GNUNET_break (0);
754 0 : fail_aggregation (au);
755 0 : return;
756 61 : default:
757 61 : break;
758 : }
759 : {
760 61 : struct TALER_CoinDepositEventP rep = {
761 61 : .header.size = htons (sizeof (rep)),
762 61 : .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED),
763 : .merchant_pub = au->merchant_pub
764 : };
765 :
766 61 : db_plugin->event_notify (db_plugin->cls,
767 : &rep.header,
768 : NULL,
769 : 0);
770 : }
771 61 : commit_aggregation (au);
772 : }
773 :
774 :
775 : static void
776 2 : evaluate_rules (
777 : void *cls,
778 : struct TALER_EXCHANGEDB_RuleUpdaterResult *rur)
779 : {
780 2 : struct AggregationUnit *au = cls;
781 2 : struct TALER_KYCLOGIC_LegitimizationRuleSet *lrs = rur->lrs;
782 : enum GNUNET_DB_QueryStatus qs;
783 : const struct TALER_KYCLOGIC_KycRule *requirement;
784 :
785 2 : au->ru = NULL;
786 2 : if (TALER_EC_NONE != rur->ec)
787 : {
788 0 : if (NULL != lrs)
789 : {
790 : /* strange, but whatever */
791 0 : TALER_KYCLOGIC_rules_free (lrs);
792 : }
793 : /* Rollback just in case, should have already been done
794 : before by the TALER_EXCHANGEDB_update_rules() logic. */
795 0 : db_plugin->rollback (db_plugin->cls);
796 0 : if ( (TALER_EC_GENERIC_DB_SOFT_FAILURE == rur->ec) &&
797 0 : (au->retries++ < MAX_RETRIES) )
798 : {
799 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
800 : "Serialization failure, trying again!\n");
801 0 : au->ru = TALER_EXCHANGEDB_update_rules (
802 : db_plugin,
803 : &attribute_key,
804 0 : &au->h_normalized_payto,
805 : false, /* aggregation does not apply to wallets */
806 : &evaluate_rules,
807 : au);
808 0 : if (NULL != au->ru)
809 1 : return;
810 : }
811 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
812 : "KYC rule evaluation failed hard: %s (%d, %s)\n",
813 : TALER_ErrorCode_get_hint (rur->ec),
814 : (int) rur->ec,
815 : rur->hint);
816 0 : cleanup_and_next (au);
817 0 : return;
818 : }
819 :
820 : /* Note that here we are in an open transaction that fetched
821 : (or updated) the current set of legitimization rules. So
822 : we must properly commit at the end! */
823 : {
824 : struct TALER_Amount next_threshold;
825 :
826 2 : qs = TALER_KYCLOGIC_kyc_test_required (
827 : TALER_KYCLOGIC_KYC_TRIGGER_AGGREGATE,
828 : lrs,
829 : &return_relevant_amounts,
830 : (void *) au,
831 : &requirement,
832 : &next_threshold);
833 : }
834 2 : if (qs < 0)
835 : {
836 0 : TALER_KYCLOGIC_rules_free (lrs);
837 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
838 0 : cleanup_and_next (au);
839 0 : return;
840 : }
841 2 : if (NULL == requirement)
842 : {
843 1 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
844 : "KYC check clear, proceeding with wire transfer\n");
845 1 : TALER_KYCLOGIC_rules_free (lrs);
846 1 : trigger_wire_transfer (au);
847 1 : return;
848 : }
849 1 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
850 : "KYC requirement for %s is %s\n",
851 : TALER_amount2s (&au->total_amount),
852 : TALER_KYCLOGIC_rule2s (requirement));
853 : {
854 : json_t *jrule;
855 :
856 1 : jrule = TALER_KYCLOGIC_rule_to_measures (requirement);
857 1 : qs = db_plugin->trigger_kyc_rule_for_account (
858 1 : db_plugin->cls,
859 : au->payto_uri,
860 1 : &au->h_normalized_payto,
861 : NULL,
862 1 : &au->merchant_pub,
863 : jrule,
864 : TALER_KYCLOGIC_rule2priority (requirement),
865 : &au->requirement_row,
866 : &au->bad_kyc_auth);
867 1 : json_decref (jrule);
868 : }
869 1 : if (qs < 0)
870 : {
871 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
872 : "Failed to persist KYC requirement `%s' in DB!\n",
873 : TALER_KYCLOGIC_rule2s (requirement));
874 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
875 0 : if (GNUNET_DB_STATUS_HARD_ERROR == qs)
876 0 : global_ret = EXIT_FAILURE;
877 0 : cleanup_and_next (au);
878 0 : return;
879 : }
880 1 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
881 : "Legitimization process %llu started\n",
882 : (unsigned long long) au->requirement_row);
883 1 : TALER_KYCLOGIC_rules_free (lrs);
884 :
885 1 : qs = db_plugin->update_aggregation_transient (db_plugin->cls,
886 1 : &au->h_full_payto,
887 1 : &au->wtid,
888 : au->requirement_row,
889 1 : &au->total_amount);
890 :
891 :
892 1 : if (qs < 0)
893 : {
894 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
895 : "Failed to persist updated transient in in DB!\n");
896 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
897 0 : if (GNUNET_DB_STATUS_HARD_ERROR == qs)
898 0 : global_ret = EXIT_FAILURE;
899 0 : cleanup_and_next (au);
900 0 : return;
901 : }
902 :
903 : {
904 1 : struct TALER_CoinDepositEventP rep = {
905 1 : .header.size = htons (sizeof (rep)),
906 1 : .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED),
907 : .merchant_pub = au->merchant_pub
908 : };
909 :
910 1 : db_plugin->event_notify (db_plugin->cls,
911 : &rep.header,
912 : NULL,
913 : 0);
914 : }
915 :
916 : /* First commit, turns the rollback in cleanup into a NOP! */
917 1 : commit_or_warn ();
918 1 : cleanup_and_next (au);
919 : }
920 :
921 :
922 : /**
923 : * The aggregation process could not be concluded and its progress state
924 : * should be remembered in a transient aggregation.
925 : *
926 : * @param[in] au aggregation that needs to be committed
927 : * into a transient aggregation
928 : */
929 : static void
930 8 : commit_to_transient (struct AggregationUnit *au)
931 : {
932 : enum GNUNET_DB_QueryStatus qs;
933 :
934 8 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
935 : "Not ready for wire transfer (%s)\n",
936 : TALER_amount2s (&au->final_amount));
937 8 : if (au->have_transient)
938 3 : qs = db_plugin->update_aggregation_transient (db_plugin->cls,
939 3 : &au->h_full_payto,
940 3 : &au->wtid,
941 : au->requirement_row,
942 3 : &au->total_amount);
943 : else
944 5 : qs = db_plugin->create_aggregation_transient (db_plugin->cls,
945 5 : &au->h_full_payto,
946 5 : au->wa->section_name,
947 5 : &au->merchant_pub,
948 5 : &au->wtid,
949 : au->requirement_row,
950 5 : &au->total_amount);
951 8 : if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
952 : {
953 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
954 : "Serialization issue, trying again later!\n");
955 0 : rollback_aggregation (au);
956 0 : return;
957 : }
958 8 : if (GNUNET_DB_STATUS_HARD_ERROR == qs)
959 : {
960 0 : GNUNET_break (0);
961 0 : fail_aggregation (au);
962 0 : return;
963 : }
964 8 : au->have_transient = true;
965 : /* commit */
966 8 : commit_aggregation (au);
967 : }
968 :
969 :
970 : /**
971 : * Test if legitimization rules are satisfied for a transfer to @a h_payto.
972 : *
973 : * @param[in] au aggregation unit to check for
974 : */
975 : static void
976 62 : check_legitimization_satisfied (struct AggregationUnit *au)
977 : {
978 62 : if (kyc_off)
979 : {
980 60 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
981 : "KYC checks are off, legitimization satisfied\n");
982 60 : trigger_wire_transfer (au);
983 60 : return;
984 : }
985 : /* get legi rules *after* committing, as the legi check
986 : should run in a separate transaction! */
987 2 : au->legi_check = true;
988 2 : commit_to_transient (au);
989 : }
990 :
991 :
992 : /**
993 : * Perform the main aggregation work for @a au. Expects to be in
994 : * a working transaction, which the caller must also ultimately commit
995 : * (or rollback) depending on our return value.
996 : *
997 : * @param[in,out] au aggregation unit to work on
998 : */
999 : static void
1000 68 : do_aggregate (struct AggregationUnit *au)
1001 : {
1002 : enum GNUNET_DB_QueryStatus qs;
1003 :
1004 68 : au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (
1005 : au->payto_uri);
1006 68 : if (NULL == au->wa)
1007 : {
1008 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1009 : "No exchange account configured for `%s', please fix your setup to continue!\n",
1010 : au->payto_uri.full_payto);
1011 0 : global_ret = EXIT_FAILURE;
1012 0 : fail_aggregation (au);
1013 0 : return;
1014 : }
1015 :
1016 : {
1017 : struct GNUNET_TIME_Timestamp start_date;
1018 : struct GNUNET_TIME_Timestamp end_date;
1019 : struct TALER_MasterSignatureP master_sig;
1020 : uint64_t rowid;
1021 :
1022 68 : qs = db_plugin->get_wire_fee (db_plugin->cls,
1023 68 : au->wa->method,
1024 : au->execution_time,
1025 : &rowid,
1026 : &start_date,
1027 : &end_date,
1028 : &au->fees,
1029 : &master_sig);
1030 68 : if (0 >= qs)
1031 : {
1032 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1033 : "Could not get wire fees for %s at %s. Aborting run.\n",
1034 : au->wa->method,
1035 : GNUNET_TIME_timestamp2s (au->execution_time));
1036 0 : fail_aggregation (au);
1037 0 : return;
1038 : }
1039 : }
1040 :
1041 : /* Now try to find other deposits to aggregate */
1042 68 : GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1043 : "Found ready deposit for %s, aggregating by target %s\n",
1044 : TALER_B2S (&au->merchant_pub),
1045 : au->payto_uri.full_payto);
1046 68 : qs = db_plugin->select_aggregation_transient (db_plugin->cls,
1047 68 : &au->h_full_payto,
1048 68 : &au->merchant_pub,
1049 68 : au->wa->section_name,
1050 : &au->wtid,
1051 : &au->trans);
1052 68 : switch (qs)
1053 : {
1054 0 : case GNUNET_DB_STATUS_HARD_ERROR:
1055 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1056 : "Failed to lookup transient aggregates!\n");
1057 0 : fail_aggregation (au);
1058 0 : return;
1059 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
1060 : /* serializiability issue, try again */
1061 0 : GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1062 : "Serialization issue, trying again later!\n");
1063 0 : rollback_aggregation (au);
1064 0 : return;
1065 63 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
1066 63 : GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
1067 63 : &au->wtid,
1068 : sizeof (au->wtid));
1069 63 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1070 : "No transient aggregation found, starting %s\n",
1071 : TALER_B2S (&au->wtid));
1072 63 : au->have_transient = false;
1073 63 : break;
1074 5 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
1075 5 : au->have_transient = true;
1076 5 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1077 : "Transient aggregation found, resuming %s\n",
1078 : TALER_B2S (&au->wtid));
1079 5 : break;
1080 : }
1081 68 : qs = db_plugin->aggregate (db_plugin->cls,
1082 68 : &au->h_full_payto,
1083 68 : &au->merchant_pub,
1084 68 : &au->wtid,
1085 : &au->total_amount);
1086 68 : if (GNUNET_DB_STATUS_HARD_ERROR == qs)
1087 : {
1088 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1089 : "Failed to execute aggregation!\n");
1090 0 : fail_aggregation (au);
1091 0 : return;
1092 : }
1093 68 : if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
1094 : {
1095 : /* serializiability issue, try again */
1096 0 : GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1097 : "Serialization issue, trying again later!\n");
1098 0 : rollback_aggregation (au);
1099 0 : return;
1100 : }
1101 68 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1102 : "Aggregation total is %s.\n",
1103 : TALER_amount2s (&au->total_amount));
1104 : /* Subtract wire transfer fee and round to the unit supported by the
1105 : wire transfer method; Check if after rounding down, we still have
1106 : an amount to transfer, and if not mark as 'tiny'. */
1107 68 : if (au->have_transient)
1108 5 : GNUNET_assert (0 <=
1109 : TALER_amount_add (&au->total_amount,
1110 : &au->total_amount,
1111 : &au->trans));
1112 :
1113 :
1114 68 : GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115 : "Rounding aggregate of %s\n",
1116 : TALER_amount2s (&au->total_amount));
1117 68 : if ( (0 >=
1118 68 : TALER_amount_subtract (&au->final_amount,
1119 68 : &au->total_amount,
1120 130 : &au->fees.wire)) ||
1121 : (GNUNET_SYSERR ==
1122 62 : TALER_amount_round_down (&au->final_amount,
1123 62 : ¤cy_round_unit)) ||
1124 62 : (TALER_amount_is_zero (&au->final_amount)) )
1125 : {
1126 6 : commit_to_transient (au);
1127 6 : return;
1128 : }
1129 62 : check_legitimization_satisfied (au);
1130 : }
1131 :
1132 :
1133 : static void
1134 121 : run_aggregation (void *cls)
1135 : {
1136 121 : struct Shard *s = cls;
1137 : struct AggregationUnit *au;
1138 : enum GNUNET_DB_QueryStatus qs;
1139 :
1140 121 : task = NULL;
1141 121 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1142 : "Checking for ready deposits to aggregate\n");
1143 : /* make sure we have current fees */
1144 121 : au = GNUNET_new (struct AggregationUnit);
1145 121 : au->execution_time = GNUNET_TIME_timestamp_get ();
1146 121 : au->shard = s;
1147 121 : if (GNUNET_OK !=
1148 121 : db_plugin->start_deferred_wire_out (db_plugin->cls))
1149 : {
1150 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1151 : "Failed to start database transaction!\n");
1152 0 : global_ret = EXIT_FAILURE;
1153 0 : GNUNET_SCHEDULER_shutdown ();
1154 0 : release_shard (s);
1155 0 : return;
1156 : }
1157 121 : qs = db_plugin->get_ready_deposit (
1158 121 : db_plugin->cls,
1159 121 : s->shard_start,
1160 121 : s->shard_end,
1161 : &au->merchant_pub,
1162 : &au->payto_uri);
1163 121 : switch (qs)
1164 : {
1165 0 : case GNUNET_DB_STATUS_HARD_ERROR:
1166 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1167 : "Failed to begin deposit iteration!\n");
1168 0 : global_ret = EXIT_FAILURE;
1169 0 : GNUNET_SCHEDULER_shutdown ();
1170 0 : cleanup_and_next (au);
1171 0 : return;
1172 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
1173 0 : cleanup_au (au);
1174 0 : db_plugin->rollback (db_plugin->cls);
1175 0 : run_task_with_shard (s);
1176 0 : return;
1177 54 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
1178 : {
1179 : struct GNUNET_TIME_Relative duration
1180 54 : = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time);
1181 :
1182 54 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1183 : "Completed shard [%u,%u] after %s with %llu deposits\n",
1184 : (unsigned int) s->shard_start,
1185 : (unsigned int) s->shard_end,
1186 : GNUNET_TIME_relative2s (duration,
1187 : true),
1188 : (unsigned long long) s->work_counter);
1189 54 : cleanup_and_next (au);
1190 54 : return;
1191 : }
1192 67 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
1193 67 : s->work_counter++;
1194 67 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1195 : "Found ready deposit!\n");
1196 : /* continued below */
1197 67 : break;
1198 : }
1199 :
1200 67 : TALER_full_payto_hash (au->payto_uri,
1201 : &au->h_full_payto);
1202 67 : TALER_full_payto_normalize_and_hash (au->payto_uri,
1203 : &au->h_normalized_payto);
1204 67 : GNUNET_break (! TALER_payto_is_wallet (au->payto_uri.full_payto));
1205 67 : do_aggregate (au);
1206 : }
1207 :
1208 :
1209 : /**
1210 : * Select a shard to work on.
1211 : *
1212 : * @param cls NULL
1213 : */
1214 : static void
1215 121 : run_shard (void *cls)
1216 : {
1217 : struct Shard *s;
1218 : enum GNUNET_DB_QueryStatus qs;
1219 :
1220 : (void) cls;
1221 121 : task = NULL;
1222 121 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1223 : "Running aggregation shard\n");
1224 121 : if (GNUNET_SYSERR ==
1225 121 : db_plugin->preflight (db_plugin->cls))
1226 : {
1227 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1228 : "Failed to obtain database connection!\n");
1229 0 : global_ret = EXIT_FAILURE;
1230 0 : GNUNET_SCHEDULER_shutdown ();
1231 0 : return;
1232 : }
1233 121 : s = GNUNET_new (struct Shard);
1234 121 : s->start_time = GNUNET_TIME_timestamp_get ();
1235 121 : qs = db_plugin->begin_revolving_shard (db_plugin->cls,
1236 : "aggregator",
1237 : shard_size,
1238 : 1U + INT32_MAX,
1239 : &s->shard_start,
1240 : &s->shard_end);
1241 121 : if (0 >= qs)
1242 : {
1243 0 : if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
1244 : {
1245 : static struct GNUNET_TIME_Relative delay;
1246 :
1247 0 : GNUNET_free (s);
1248 0 : delay = GNUNET_TIME_randomized_backoff (delay,
1249 : GNUNET_TIME_UNIT_SECONDS);
1250 0 : GNUNET_assert (NULL == task);
1251 0 : task = GNUNET_SCHEDULER_add_delayed (delay,
1252 : &run_shard,
1253 : NULL);
1254 0 : return;
1255 : }
1256 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1257 : "Failed to begin shard (%d)!\n",
1258 : qs);
1259 0 : GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs);
1260 0 : global_ret = EXIT_FAILURE;
1261 0 : GNUNET_SCHEDULER_shutdown ();
1262 0 : return;
1263 : }
1264 121 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1265 : "Starting shard [%u:%u]!\n",
1266 : (unsigned int) s->shard_start,
1267 : (unsigned int) s->shard_end);
1268 121 : GNUNET_assert (NULL == task);
1269 121 : task = GNUNET_SCHEDULER_add_now (&run_aggregation,
1270 : s);
1271 : }
1272 :
1273 :
1274 : static void
1275 122 : drain_kyc_alerts (void *cls)
1276 : {
1277 : enum GNUNET_DB_QueryStatus qs;
1278 : struct AggregationUnit *au;
1279 :
1280 : (void) cls;
1281 122 : task = NULL;
1282 122 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1283 : "Draining KYC alerts\n");
1284 122 : au = GNUNET_new (struct AggregationUnit);
1285 122 : au->execution_time = GNUNET_TIME_timestamp_get ();
1286 122 : if (GNUNET_SYSERR ==
1287 122 : db_plugin->preflight (db_plugin->cls))
1288 : {
1289 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1290 : "Failed to obtain database connection!\n");
1291 0 : global_ret = EXIT_FAILURE;
1292 0 : GNUNET_SCHEDULER_shutdown ();
1293 0 : return;
1294 : }
1295 122 : if (GNUNET_OK !=
1296 122 : db_plugin->start (db_plugin->cls,
1297 : "handle kyc alerts"))
1298 : {
1299 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1300 : "Failed to start database transaction!\n");
1301 0 : global_ret = EXIT_FAILURE;
1302 0 : GNUNET_SCHEDULER_shutdown ();
1303 0 : return;
1304 : }
1305 : while (1)
1306 : {
1307 122 : qs = db_plugin->drain_kyc_alert (db_plugin->cls,
1308 : 1,
1309 : &au->h_normalized_payto);
1310 122 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1311 : "Found %d KYC alerts\n",
1312 : (int) qs);
1313 122 : switch (qs)
1314 : {
1315 0 : case GNUNET_DB_STATUS_HARD_ERROR:
1316 0 : GNUNET_break (0);
1317 0 : db_plugin->rollback (db_plugin->cls);
1318 0 : GNUNET_free (au);
1319 0 : GNUNET_assert (NULL == task);
1320 0 : global_ret = EXIT_FAILURE;
1321 0 : GNUNET_SCHEDULER_shutdown ();
1322 0 : return;
1323 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
1324 0 : db_plugin->rollback (db_plugin->cls);
1325 0 : GNUNET_assert (NULL == task);
1326 0 : GNUNET_free (au);
1327 0 : task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
1328 : NULL);
1329 0 : return;
1330 121 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
1331 121 : GNUNET_free (au);
1332 121 : db_plugin->rollback (db_plugin->cls);
1333 121 : GNUNET_assert (NULL == task);
1334 121 : task = GNUNET_SCHEDULER_add_now (&run_shard,
1335 : NULL);
1336 121 : return;
1337 1 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
1338 : /* handled below */
1339 1 : break;
1340 : }
1341 1 : qs = db_plugin->find_aggregation_transient (
1342 1 : db_plugin->cls,
1343 1 : &au->h_normalized_payto,
1344 : &au->payto_uri,
1345 : &au->wtid,
1346 : &au->merchant_pub,
1347 : &au->trans);
1348 1 : switch (qs)
1349 : {
1350 0 : case GNUNET_DB_STATUS_HARD_ERROR:
1351 0 : GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1352 : "Failed to lookup transient aggregates!\n");
1353 0 : db_plugin->rollback (db_plugin->cls);
1354 0 : GNUNET_assert (NULL == task);
1355 0 : task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
1356 : NULL);
1357 0 : return;
1358 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
1359 : /* serializiability issue, try again */
1360 0 : GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1361 : "Serialization issue, trying again later!\n");
1362 0 : db_plugin->rollback (db_plugin->cls);
1363 0 : GNUNET_assert (NULL == task);
1364 0 : task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
1365 : NULL);
1366 0 : return;
1367 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
1368 0 : continue; /* while (1) */
1369 1 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
1370 1 : TALER_full_payto_hash (au->payto_uri,
1371 : &au->h_full_payto);
1372 1 : au->have_transient = true;
1373 1 : do_aggregate (au);
1374 1 : return;
1375 : }
1376 0 : GNUNET_assert (0);
1377 : } /* while(1) */
1378 : }
1379 :
1380 :
1381 : /**
1382 : * First task.
1383 : *
1384 : * @param cls closure, NULL
1385 : * @param args remaining command-line arguments
1386 : * @param cfgfile name of the configuration file used (for saving, can be NULL!)
1387 : * @param c configuration
1388 : */
1389 : static void
1390 55 : run (void *cls,
1391 : char *const *args,
1392 : const char *cfgfile,
1393 : const struct GNUNET_CONFIGURATION_Handle *c)
1394 : {
1395 : unsigned long long ass;
1396 : (void) cls;
1397 : (void) args;
1398 : (void) cfgfile;
1399 :
1400 55 : cfg = c;
1401 55 : if (GNUNET_OK !=
1402 55 : parse_aggregator_config ())
1403 : {
1404 0 : cfg = NULL;
1405 0 : global_ret = EXIT_NOTCONFIGURED;
1406 0 : return;
1407 : }
1408 55 : if (GNUNET_OK !=
1409 55 : GNUNET_CONFIGURATION_get_value_number (cfg,
1410 : "exchange",
1411 : "AGGREGATOR_SHARD_SIZE",
1412 : &ass))
1413 : {
1414 0 : cfg = NULL;
1415 0 : global_ret = EXIT_NOTCONFIGURED;
1416 0 : return;
1417 : }
1418 55 : if ( (0 == ass) ||
1419 55 : (ass > INT32_MAX) )
1420 55 : shard_size = 1U + INT32_MAX;
1421 : else
1422 0 : shard_size = (uint32_t) ass;
1423 55 : if (GNUNET_OK !=
1424 55 : TALER_KYCLOGIC_kyc_init (cfg,
1425 : cfgfile))
1426 : {
1427 0 : cfg = NULL;
1428 0 : global_ret = EXIT_NOTCONFIGURED;
1429 0 : return;
1430 : }
1431 55 : GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1432 : NULL);
1433 55 : GNUNET_assert (NULL == task);
1434 55 : task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
1435 : NULL);
1436 : }
1437 :
1438 :
1439 : /**
1440 : * The main function of the taler-exchange-aggregator.
1441 : *
1442 : * @param argc number of arguments from the command line
1443 : * @param argv command line arguments
1444 : * @return 0 ok, non-zero on error, see #global_ret
1445 : */
1446 : int
1447 55 : main (int argc,
1448 : char *const *argv)
1449 : {
1450 55 : struct GNUNET_GETOPT_CommandLineOption options[] = {
1451 55 : GNUNET_GETOPT_option_timetravel ('T',
1452 : "timetravel"),
1453 55 : GNUNET_GETOPT_option_flag ('t',
1454 : "test",
1455 : "run in test mode and exit when idle",
1456 : &test_mode),
1457 55 : GNUNET_GETOPT_option_flag ('y',
1458 : "kyc-off",
1459 : "perform wire transfers without KYC checks",
1460 : &kyc_off),
1461 : GNUNET_GETOPT_OPTION_END
1462 : };
1463 : enum GNUNET_GenericReturnValue ret;
1464 :
1465 55 : ret = GNUNET_PROGRAM_run (
1466 : TALER_EXCHANGE_project_data (),
1467 : argc, argv,
1468 : "taler-exchange-aggregator",
1469 : gettext_noop (
1470 : "background process that aggregates and executes wire transfers"),
1471 : options,
1472 : &run, NULL);
1473 55 : if (GNUNET_SYSERR == ret)
1474 0 : return EXIT_INVALIDARGUMENT;
1475 55 : if (GNUNET_NO == ret)
1476 0 : return EXIT_SUCCESS;
1477 55 : return global_ret;
1478 : }
1479 :
1480 :
1481 : /* end of taler-exchange-aggregator.c */
|