LCOV - code coverage report
Current view: top level - exchange - taler-exchange-aggregator.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 311 525 59.2 %
Date: 2025-07-09 07:38:29 Functions: 19 22 86.4 %

          Line data    Source code
       1             : /*
       2             :   This file is part of TALER
       3             :   Copyright (C) 2016-2025 Taler Systems SA
       4             : 
       5             :   TALER is free software; you can redistribute it and/or modify it under the
       6             :   terms of the GNU Affero General Public License as published by the Free Software
       7             :   Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
      12             : 
      13             :   You should have received a copy of the GNU Affero General Public License along with
      14             :   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             : */
      16             : 
      17             : /**
      18             :  * @file taler-exchange-aggregator.c
      19             :  * @brief Process that aggregates outgoing transactions and prepares their execution
      20             :  * @author Christian Grothoff
      21             :  */
      22             : #include "taler/platform.h"
      23             : #include <gnunet/gnunet_util_lib.h>
      24             : #include <jansson.h>
      25             : #include <pthread.h>
      26             : #include "taler/taler_exchangedb_lib.h"
      27             : #include "taler/taler_exchangedb_plugin.h"
      28             : #include "taler/taler_json_lib.h"
      29             : #include "taler/taler_kyclogic_lib.h"
      30             : #include "taler/taler_bank_service.h"
      31             : #include "taler/taler_dbevents.h"
      32             : 
      33             : /**
      34             :  * How often do we retry after serialization failures?
      35             :  */
      36             : #define MAX_RETRIES 5
      37             : 
      38             : /**
      39             :  * Information about one aggregation process to be executed.  There is
      40             :  * at most one of these around at any given point in time.
      41             :  * Note that this limits parallelism, and we might want
      42             :  * to revise this decision at a later point.
      43             :  */
      44             : struct AggregationUnit
      45             : {
      46             :   /**
      47             :    * Public key of the merchant.
      48             :    */
      49             :   struct TALER_MerchantPublicKeyP merchant_pub;
      50             : 
      51             :   /**
      52             :    * Transient amount already found aggregated,
      53             :    * set only if @e have_transient is true.
      54             :    */
      55             :   struct TALER_Amount trans;
      56             : 
      57             :   /**
      58             :    * Total amount to be transferred, before subtraction of @e fees.wire and rounding down.
      59             :    */
      60             :   struct TALER_Amount total_amount;
      61             : 
      62             :   /**
      63             :    * Final amount to be transferred (after fee and rounding down).
      64             :    */
      65             :   struct TALER_Amount final_amount;
      66             : 
      67             :   /**
      68             :    * Wire fee we charge for @e wp at @e execution_time.
      69             :    */
      70             :   struct TALER_WireFeeSet fees;
      71             : 
      72             :   /**
      73             :    * Wire transfer identifier we use.
      74             :    */
      75             :   struct TALER_WireTransferIdentifierRawP wtid;
      76             : 
      77             :   /**
      78             :    * The current time (which triggered the aggregation and
      79             :    * defines the wire fee).
      80             :    */
      81             :   struct GNUNET_TIME_Timestamp execution_time;
      82             : 
      83             :   /**
      84             :    * Wire details of the merchant.
      85             :    */
      86             :   struct TALER_FullPayto payto_uri;
      87             : 
      88             :   /**
      89             :    * Selected wire target for the aggregation.
      90             :    */
      91             :   struct TALER_FullPaytoHashP h_full_payto;
      92             : 
      93             :   /**
      94             :    * Selected wire target for KYC checks.
      95             :    */
      96             :   struct TALER_NormalizedPaytoHashP h_normalized_payto;
      97             : 
      98             :   /**
      99             :    * Exchange wire account to be used for the preparation and
     100             :    * eventual execution of the aggregate wire transfer.
     101             :    */
     102             :   const struct TALER_EXCHANGEDB_AccountInfo *wa;
     103             : 
     104             :   /**
     105             :    * Shard this aggregation unit is part of.
     106             :    */
     107             :   struct Shard *shard;
     108             : 
     109             :   /**
     110             :    * Handle to async process to obtain the legitimization rules.
     111             :    */
     112             :   struct TALER_EXCHANGEDB_RuleUpdater *ru;
     113             : 
     114             :   /**
     115             :    * Row in KYC table for legitimization requirements
     116             :    * that are pending for this aggregation, or 0 if none.
     117             :    */
     118             :   uint64_t requirement_row;
     119             : 
     120             :   /**
     121             :    * How often did we retry the transaction?
     122             :    */
     123             :   unsigned int retries;
     124             : 
     125             :   /**
     126             :    * Should we run a follow-up transaction with a legitimization
     127             :    * check?
     128             :    */
     129             :   bool legi_check;
     130             : 
     131             :   /**
     132             :    * Do we have an entry in the transient table for
     133             :    * this aggregation?
     134             :    */
     135             :   bool have_transient;
     136             : 
     137             :   /**
     138             :    * Is the wrong merchant public key associated with
     139             :    * the KYC data?
     140             :    */
     141             :   bool bad_kyc_auth;
     142             : 
     143             : };
     144             : 
     145             : 
     146             : /**
     147             :  * Work shard we are processing.
     148             :  */
     149             : struct Shard
     150             : {
     151             : 
     152             :   /**
     153             :    * When did we start processing the shard?
     154             :    */
     155             :   struct GNUNET_TIME_Timestamp start_time;
     156             : 
     157             :   /**
     158             :    * Starting row of the shard.
     159             :    */
     160             :   uint32_t shard_start;
     161             : 
     162             :   /**
     163             :    * Inclusive end row of the shard.
     164             :    */
     165             :   uint32_t shard_end;
     166             : 
     167             :   /**
     168             :    * Number of starting points found in the shard.
     169             :    */
     170             :   uint64_t work_counter;
     171             : 
     172             : };
     173             : 
     174             : 
     175             : /**
     176             :  * What is the smallest unit we support for wire transfers?
     177             :  * We will need to round down to a multiple of this amount.
     178             :  */
     179             : static struct TALER_Amount currency_round_unit;
     180             : 
     181             : /**
     182             :  * What is the base URL of this exchange?  Used in the
     183             :  * wire transfer subjects so that merchants and governments
     184             :  * can ask for the list of aggregated deposits.
     185             :  */
     186             : static char *exchange_base_url;
     187             : 
     188             : /**
     189             :  * Set to #GNUNET_YES if this exchange does not support KYC checks
     190             :  * and thus deposits are to be aggregated regardless of the
     191             :  * KYC status of the target account.
     192             :  */
     193             : static int kyc_off;
     194             : 
     195             : /**
     196             :  * The exchange's configuration.
     197             :  */
     198             : static const struct GNUNET_CONFIGURATION_Handle *cfg;
     199             : 
     200             : /**
     201             :  * Key used to encrypt KYC attribute data in our database.
     202             :  */
     203             : static struct TALER_AttributeEncryptionKeyP attribute_key;
     204             : 
     205             : /**
     206             :  * Our database plugin.
     207             :  */
     208             : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
     209             : 
     210             : /**
     211             :  * Next task to run, if any.
     212             :  */
     213             : static struct GNUNET_SCHEDULER_Task *task;
     214             : 
     215             : /**
     216             :  * How long should we sleep when idle before trying to find more work?
     217             :  */
     218             : static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
     219             : 
     220             : /**
     221             :  * How big are the shards we are processing? Is an inclusive offset, so every
     222             :  * shard ranges from [X,X+shard_size) exclusive.  So a shard covers
     223             :  * shard_size slots.  The maximum value for shard_size is INT32_MAX+1.
     224             :  */
     225             : static uint32_t shard_size;
     226             : 
     227             : /**
     228             :  * Value to return from main(). 0 on success, non-zero on errors.
     229             :  */
     230             : static int global_ret;
     231             : 
     232             : /**
     233             :  * #GNUNET_YES if we are in test mode and should exit when idle.
     234             :  */
     235             : static int test_mode;
     236             : 
     237             : 
     238             : /**
     239             :  * Main work function that queries the DB and aggregates transactions
     240             :  * into larger wire transfers.
     241             :  *
     242             :  * @param cls a `struct Shard *`
     243             :  */
     244             : static void
     245             : run_aggregation (void *cls);
     246             : 
     247             : 
     248             : /**
     249             :  * Work on transactions unlocked by KYC.
     250             :  *
     251             :  * @param cls NULL
     252             :  */
     253             : static void
     254             : drain_kyc_alerts (void *cls);
     255             : 
     256             : 
     257             : /**
     258             :  * Free data stored in @a au, including @a au itself.
     259             :  *
     260             :  * @param[in] au aggregation unit to clean up
     261             :  */
     262             : static void
     263         122 : cleanup_au (struct AggregationUnit *au)
     264             : {
     265         122 :   GNUNET_assert (NULL != au);
     266         122 :   if (NULL != au->ru)
     267             :   {
     268           0 :     GNUNET_break (0);
     269           0 :     TALER_EXCHANGEDB_update_rules_cancel (au->ru);
     270           0 :     au->ru = NULL;
     271             :   }
     272         122 :   GNUNET_free (au->payto_uri.full_payto);
     273         122 :   GNUNET_free (au);
     274         122 : }
     275             : 
     276             : 
     277             : /**
     278             :  * Perform a database commit. If it fails, print a warning.
     279             :  *
     280             :  * @return status of commit
     281             :  */
     282             : static enum GNUNET_DB_QueryStatus
     283          70 : commit_or_warn (void)
     284             : {
     285             :   enum GNUNET_DB_QueryStatus qs;
     286             : 
     287          70 :   qs = db_plugin->commit (db_plugin->cls);
     288          70 :   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
     289          70 :     return qs;
     290           0 :   GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
     291             :               ? GNUNET_ERROR_TYPE_INFO
     292             :               : GNUNET_ERROR_TYPE_ERROR,
     293             :               "Failed to commit database transaction!\n");
     294           0 :   return qs;
     295             : }
     296             : 
     297             : 
     298             : /**
     299             :  * Release lock on shard @a s in the database.
     300             :  * On error, terminates this process.
     301             :  *
     302             :  * @param[in] s shard to free (and memory to release)
     303             :  */
     304             : static void
     305         121 : release_shard (struct Shard *s)
     306             : {
     307             :   enum GNUNET_DB_QueryStatus qs;
     308             : 
     309         121 :   qs = db_plugin->release_revolving_shard (
     310         121 :     db_plugin->cls,
     311             :     "aggregator",
     312             :     s->shard_start,
     313             :     s->shard_end);
     314         121 :   GNUNET_free (s);
     315         121 :   switch (qs)
     316             :   {
     317           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     318             :   case GNUNET_DB_STATUS_SOFT_ERROR:
     319           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
     320           0 :     GNUNET_break (0);
     321           0 :     global_ret = EXIT_FAILURE;
     322           0 :     GNUNET_SCHEDULER_shutdown ();
     323           0 :     return;
     324           0 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     325             :     /* Strange, but let's just continue */
     326           0 :     break;
     327         121 :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     328             :     /* normal case */
     329         121 :     break;
     330             :   }
     331             : }
     332             : 
     333             : 
     334             : /**
     335             :  * Schedule the next major task, or exit depending on mode.
     336             :  */
     337             : static void
     338         122 : next_task (uint64_t counter)
     339             : {
     340         122 :   if ( (GNUNET_YES == test_mode) &&
     341             :        (0 == counter) )
     342             :   {
     343             :     /* in test mode, shutdown after a shard is done with 0 work */
     344          55 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     345             :                 "No work done and in test mode, shutting down\n");
     346          55 :     GNUNET_SCHEDULER_shutdown ();
     347          55 :     return;
     348             :   }
     349          67 :   GNUNET_assert (NULL == task);
     350             :   /* If we ended up doing zero work, sleep a bit */
     351          67 :   if (0 == counter)
     352             :   {
     353           0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     354             :                 "Going to sleep for %s before trying again\n",
     355             :                 GNUNET_TIME_relative2s (aggregator_idle_sleep_interval,
     356             :                                         true));
     357           0 :     task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
     358             :                                          &drain_kyc_alerts,
     359             :                                          NULL);
     360             :   }
     361             :   else
     362             :   {
     363          67 :     task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
     364             :                                      NULL);
     365             :   }
     366             : }
     367             : 
     368             : 
     369             : /**
     370             :  * Rollback the current transaction (if any),
     371             :  * then free data stored in @a au, including @a au itself, and then
     372             :  * run the next aggregation task.
     373             :  *
     374             :  * @param[in] au aggregation unit to clean up
     375             :  */
     376             : static void
     377         122 : cleanup_and_next (struct AggregationUnit *au)
     378             : {
     379         122 :   struct Shard *s = au->shard;
     380         122 :   uint64_t counter = (NULL == s) ? 0 : s->work_counter;
     381             : 
     382             :   /* just in case, often no transaction is running here anymore */
     383         122 :   db_plugin->rollback (db_plugin->cls);
     384         122 :   cleanup_au (au);
     385         122 :   if (NULL != s)
     386         121 :     release_shard (s);
     387         122 :   if (EXIT_SUCCESS == global_ret)
     388         122 :     next_task (counter);
     389         122 : }
     390             : 
     391             : 
     392             : /**
     393             :  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
     394             :  *
     395             :  * @param cls closure
     396             :  */
     397             : static void
     398          55 : shutdown_task (void *cls)
     399             : {
     400             :   (void) cls;
     401          55 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     402             :               "Running shutdown\n");
     403          55 :   if (NULL != task)
     404             :   {
     405           0 :     GNUNET_SCHEDULER_cancel (task);
     406           0 :     task = NULL;
     407             :   }
     408          55 :   TALER_KYCLOGIC_kyc_done ();
     409          55 :   TALER_EXCHANGEDB_plugin_unload (db_plugin);
     410          55 :   db_plugin = NULL;
     411          55 :   TALER_EXCHANGEDB_unload_accounts ();
     412          55 :   cfg = NULL;
     413          55 : }
     414             : 
     415             : 
     416             : /**
     417             :  * Parse the configuration for aggregator.
     418             :  *
     419             :  * @return #GNUNET_OK on success
     420             :  */
     421             : static enum GNUNET_GenericReturnValue
     422          55 : parse_aggregator_config (void)
     423             : {
     424             :   enum GNUNET_GenericReturnValue enable_kyc;
     425             : 
     426             :   enable_kyc
     427          55 :     = GNUNET_CONFIGURATION_get_value_yesno (
     428             :         cfg,
     429             :         "exchange",
     430             :         "ENABLE_KYC");
     431          55 :   if (GNUNET_SYSERR == enable_kyc)
     432             :   {
     433           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     434             :                 "Need YES or NO in section `exchange' under `ENABLE_KYC'\n");
     435           0 :     return GNUNET_SYSERR;
     436             :   }
     437          55 :   if (GNUNET_NO == enable_kyc)
     438             :   {
     439          41 :     kyc_off = true;
     440             :   }
     441             :   else
     442             :   {
     443             :     char *attr_enc_key_str;
     444             : 
     445          14 :     if (GNUNET_OK !=
     446          14 :         GNUNET_CONFIGURATION_get_value_string (cfg,
     447             :                                                "exchange",
     448             :                                                "ATTRIBUTE_ENCRYPTION_KEY",
     449             :                                                &attr_enc_key_str))
     450             :     {
     451           0 :       GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     452             :                                  "exchange",
     453             :                                  "ATTRIBUTE_ENCRYPTION_KEY");
     454           0 :       return GNUNET_SYSERR;
     455             :     }
     456          14 :     GNUNET_CRYPTO_hash (attr_enc_key_str,
     457             :                         strlen (attr_enc_key_str),
     458             :                         &attribute_key.hash);
     459          14 :     GNUNET_free (attr_enc_key_str);
     460             :   }
     461          55 :   if (GNUNET_OK !=
     462          55 :       GNUNET_CONFIGURATION_get_value_string (cfg,
     463             :                                              "exchange",
     464             :                                              "BASE_URL",
     465             :                                              &exchange_base_url))
     466             :   {
     467           0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     468             :                                "exchange",
     469             :                                "BASE_URL");
     470           0 :     return GNUNET_SYSERR;
     471             :   }
     472          55 :   if (GNUNET_OK !=
     473          55 :       GNUNET_CONFIGURATION_get_value_time (cfg,
     474             :                                            "exchange",
     475             :                                            "AGGREGATOR_IDLE_SLEEP_INTERVAL",
     476             :                                            &aggregator_idle_sleep_interval))
     477             :   {
     478           0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     479             :                                "exchange",
     480             :                                "AGGREGATOR_IDLE_SLEEP_INTERVAL");
     481           0 :     return GNUNET_SYSERR;
     482             :   }
     483          55 :   if ( (GNUNET_OK !=
     484          55 :         TALER_config_get_amount (cfg,
     485             :                                  "exchange",
     486             :                                  "CURRENCY_ROUND_UNIT",
     487          55 :                                  &currency_round_unit)) ||
     488          55 :        (TALER_amount_is_zero (&currency_round_unit)) )
     489             :   {
     490           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     491             :                 "Need non-zero amount in section `exchange' under `CURRENCY_ROUND_UNIT'\n");
     492           0 :     return GNUNET_SYSERR;
     493             :   }
     494             : 
     495          55 :   if (NULL ==
     496          55 :       (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg,
     497             :                                                  false)))
     498             :   {
     499           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     500             :                 "Failed to initialize DB subsystem\n");
     501           0 :     return GNUNET_SYSERR;
     502             :   }
     503          55 :   if (GNUNET_OK !=
     504          55 :       TALER_EXCHANGEDB_load_accounts (cfg,
     505             :                                       TALER_EXCHANGEDB_ALO_DEBIT))
     506             :   {
     507           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     508             :                 "No wire accounts configured for debit!\n");
     509           0 :     TALER_EXCHANGEDB_plugin_unload (db_plugin);
     510           0 :     db_plugin = NULL;
     511           0 :     return GNUNET_SYSERR;
     512             :   }
     513          55 :   return GNUNET_OK;
     514             : }
     515             : 
     516             : 
     517             : /**
     518             :  * Callback to return all applicable amounts for the KYC
     519             :  * decision to @ a cb.
     520             :  *
     521             :  * @param cls a `struct AggregationUnit *`
     522             :  * @param limit time limit for the iteration
     523             :  * @param cb function to call with the amounts
     524             :  * @param cb_cls closure for @a cb
     525             :  * @return transaction status
     526             :  */
     527             : static enum GNUNET_DB_QueryStatus
     528           2 : return_relevant_amounts (void *cls,
     529             :                          struct GNUNET_TIME_Absolute limit,
     530             :                          TALER_EXCHANGEDB_KycAmountCallback cb,
     531             :                          void *cb_cls)
     532             : {
     533           2 :   const struct AggregationUnit *au = cls;
     534             :   enum GNUNET_DB_QueryStatus qs;
     535             : 
     536           2 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     537             :               "Returning amount %s in KYC check\n",
     538             :               TALER_amount2s (&au->total_amount));
     539           2 :   if (GNUNET_OK !=
     540           2 :       cb (cb_cls,
     541             :           &au->total_amount,
     542             :           GNUNET_TIME_absolute_get ()))
     543           0 :     return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
     544           2 :   qs = db_plugin->select_aggregation_amounts_for_kyc_check (
     545           2 :     db_plugin->cls,
     546             :     &au->h_normalized_payto,
     547             :     limit,
     548             :     cb,
     549             :     cb_cls);
     550           2 :   if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     551             :   {
     552           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     553             :                 "Failed to select aggregation amounts for KYC limit check!\n");
     554             :   }
     555           2 :   return qs;
     556             : }
     557             : 
     558             : 
     559             : /**
     560             :  * The aggregation process failed hard, shut down the program.
     561             :  *
     562             :  * @param[in] au aggregation that failed hard
     563             :  */
     564             : static void
     565           0 : fail_aggregation (struct AggregationUnit *au)
     566             : {
     567           0 :   struct Shard *s = au->shard;
     568             : 
     569           0 :   cleanup_au (au);
     570           0 :   global_ret = EXIT_FAILURE;
     571           0 :   GNUNET_SCHEDULER_shutdown ();
     572           0 :   db_plugin->rollback (db_plugin->cls);
     573           0 :   release_shard (s);
     574           0 : }
     575             : 
     576             : 
     577             : /**
     578             :  * Run the next task with the given shard @a s.
     579             :  *
     580             :  * @param s shard to run, NULL to run more drain jobs
     581             :  */
     582             : static void
     583           0 : run_task_with_shard (struct Shard *s)
     584             : {
     585           0 :   GNUNET_assert (NULL == task);
     586           0 :   if (NULL == s)
     587           0 :     task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
     588             :                                      NULL);
     589             :   else
     590           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     591             :                                      s);
     592           0 : }
     593             : 
     594             : 
     595             : /**
     596             :  * The aggregation process failed with a serialization
     597             :  * issue.  Rollback the transaction and try again.
     598             :  *
     599             :  * @param[in] au aggregation that needs to be rolled back
     600             :  */
     601             : static void
     602           0 : rollback_aggregation (struct AggregationUnit *au)
     603             : {
     604           0 :   struct Shard *s = au->shard;
     605             : 
     606           0 :   cleanup_au (au);
     607           0 :   db_plugin->rollback (db_plugin->cls);
     608           0 :   run_task_with_shard (s);
     609           0 : }
     610             : 
     611             : 
     612             : /**
     613             :  * Function called with legitimization rule set. Check
     614             :  * how that affects the aggregation process.
     615             :  *
     616             :  * @param[in] cls a `struct AggregationUnit *`
     617             :  * @param[in] rur new legitimization rule set to evaluate
     618             :  */
     619             : static void
     620             : evaluate_rules (
     621             :   void *cls,
     622             :   struct TALER_EXCHANGEDB_RuleUpdaterResult *rur);
     623             : 
     624             : 
     625             : /**
     626             :  * The aggregation process succeeded and should be finally committed.
     627             :  *
     628             :  * @param[in] au aggregation that needs to be committed
     629             :  */
     630             : static void
     631          69 : commit_aggregation (struct AggregationUnit *au)
     632             : {
     633          69 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     634             :               "Committing aggregation result over %s to %s\n",
     635             :               TALER_amount2s (&au->final_amount),
     636             :               au->payto_uri.full_payto);
     637             :   /* Now we can finally commit the overall transaction, as we are
     638             :      again consistent if all of this passes. */
     639          69 :   switch (commit_or_warn ())
     640             :   {
     641           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     642             :     /* try again */
     643           0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     644             :                 "Serialization issue on commit; trying again later!\n");
     645           0 :     cleanup_and_next (au);
     646           0 :     return;
     647           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     648           0 :     GNUNET_break (0);
     649           0 :     global_ret = EXIT_FAILURE;
     650           0 :     GNUNET_SCHEDULER_shutdown ();
     651           0 :     cleanup_and_next (au);
     652           0 :     return;
     653          69 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     654          69 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     655             :                 "Commit complete, going again\n");
     656          69 :     if (au->legi_check)
     657             :     {
     658           2 :       au->legi_check = false;
     659           4 :       au->ru = TALER_EXCHANGEDB_update_rules (
     660             :         db_plugin,
     661             :         &attribute_key,
     662           2 :         &au->h_normalized_payto,
     663             :         false, /* aggregation doesn't apply to wallets */
     664             :         &evaluate_rules,
     665             :         au);
     666           2 :       if (NULL != au->ru)
     667           2 :         return;
     668             :     }
     669          67 :     cleanup_and_next (au);
     670          67 :     return;
     671           0 :   default:
     672           0 :     GNUNET_break (0);
     673           0 :     global_ret = EXIT_FAILURE;
     674           0 :     GNUNET_SCHEDULER_shutdown ();
     675           0 :     cleanup_and_next (au);
     676           0 :     return;
     677             :   }
     678             : }
     679             : 
     680             : 
     681             : /**
     682             :  * Trigger the wire transfer for the @a au
     683             :  * and delete the record of the aggregation.
     684             :  *
     685             :  * @param[in] au information about the aggregation
     686             :  */
     687             : static void
     688          61 : trigger_wire_transfer (struct AggregationUnit *au)
     689             : {
     690             :   enum GNUNET_DB_QueryStatus qs;
     691             : 
     692          61 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     693             :               "Preparing wire transfer of %s to %s\n",
     694             :               TALER_amount2s (&au->final_amount),
     695             :               TALER_B2S (&au->merchant_pub));
     696             :   {
     697             :     void *buf;
     698             :     size_t buf_size;
     699             : 
     700          61 :     TALER_BANK_prepare_transfer (au->payto_uri,
     701          61 :                                  &au->final_amount,
     702             :                                  exchange_base_url,
     703          61 :                                  &au->wtid,
     704             :                                  &buf,
     705             :                                  &buf_size);
     706          61 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     707             :                 "Storing %u bytes of wire prepare data\n",
     708             :                 (unsigned int) buf_size);
     709             :     /* Commit our intention to execute the wire transfer! */
     710          61 :     qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
     711          61 :                                               au->wa->method,
     712             :                                               buf,
     713             :                                               buf_size);
     714          61 :     GNUNET_log (qs >= 0
     715             :                 ? GNUNET_ERROR_TYPE_DEBUG
     716             :                 : GNUNET_ERROR_TYPE_WARNING,
     717             :                 "wire_prepare_data_insert returned %d\n",
     718             :                 (int) qs);
     719          61 :     GNUNET_free (buf);
     720             :   }
     721             :   /* Commit the WTID data to 'wire_out'  */
     722          61 :   if (qs >= 0)
     723             :   {
     724          61 :     qs = db_plugin->store_wire_transfer_out (
     725          61 :       db_plugin->cls,
     726             :       au->execution_time,
     727          61 :       &au->wtid,
     728          61 :       &au->h_full_payto,
     729          61 :       au->wa->section_name,
     730          61 :       &au->final_amount);
     731          61 :     GNUNET_log (qs >= 0
     732             :                 ? GNUNET_ERROR_TYPE_DEBUG
     733             :                 : GNUNET_ERROR_TYPE_WARNING,
     734             :                 "store_wire_transfer_out returned %d\n",
     735             :                 (int) qs);
     736             :   }
     737          61 :   if ( (qs >= 0) &&
     738          61 :        au->have_transient)
     739           3 :     qs = db_plugin->delete_aggregation_transient (
     740           3 :       db_plugin->cls,
     741           3 :       &au->h_full_payto,
     742           3 :       &au->wtid);
     743             : 
     744          61 :   switch (qs)
     745             :   {
     746           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     747           0 :     GNUNET_log (
     748             :       GNUNET_ERROR_TYPE_INFO,
     749             :       "Serialization issue during aggregation; trying again later!\n");
     750           0 :     rollback_aggregation (au);
     751           0 :     return;
     752           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     753           0 :     GNUNET_break (0);
     754           0 :     fail_aggregation (au);
     755           0 :     return;
     756          61 :   default:
     757          61 :     break;
     758             :   }
     759             :   {
     760          61 :     struct TALER_CoinDepositEventP rep = {
     761          61 :       .header.size = htons (sizeof (rep)),
     762          61 :       .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED),
     763             :       .merchant_pub = au->merchant_pub
     764             :     };
     765             : 
     766          61 :     db_plugin->event_notify (db_plugin->cls,
     767             :                              &rep.header,
     768             :                              NULL,
     769             :                              0);
     770             :   }
     771          61 :   commit_aggregation (au);
     772             : }
     773             : 
     774             : 
     775             : static void
     776           2 : evaluate_rules (
     777             :   void *cls,
     778             :   struct TALER_EXCHANGEDB_RuleUpdaterResult *rur)
     779             : {
     780           2 :   struct AggregationUnit *au = cls;
     781           2 :   struct TALER_KYCLOGIC_LegitimizationRuleSet *lrs = rur->lrs;
     782             :   enum GNUNET_DB_QueryStatus qs;
     783             :   const struct TALER_KYCLOGIC_KycRule *requirement;
     784             : 
     785           2 :   au->ru = NULL;
     786           2 :   if (TALER_EC_NONE != rur->ec)
     787             :   {
     788           0 :     if (NULL != lrs)
     789             :     {
     790             :       /* strange, but whatever */
     791           0 :       TALER_KYCLOGIC_rules_free (lrs);
     792             :     }
     793             :     /* Rollback just in case, should have already been done
     794             :        before by the TALER_EXCHANGEDB_update_rules() logic. */
     795           0 :     db_plugin->rollback (db_plugin->cls);
     796           0 :     if ( (TALER_EC_GENERIC_DB_SOFT_FAILURE == rur->ec) &&
     797           0 :          (au->retries++ < MAX_RETRIES) )
     798             :     {
     799           0 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     800             :                   "Serialization failure, trying again!\n");
     801           0 :       au->ru = TALER_EXCHANGEDB_update_rules (
     802             :         db_plugin,
     803             :         &attribute_key,
     804           0 :         &au->h_normalized_payto,
     805             :         false, /* aggregation does not apply to wallets */
     806             :         &evaluate_rules,
     807             :         au);
     808           0 :       if (NULL != au->ru)
     809           1 :         return;
     810             :     }
     811           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     812             :                 "KYC rule evaluation failed hard: %s (%d, %s)\n",
     813             :                 TALER_ErrorCode_get_hint (rur->ec),
     814             :                 (int) rur->ec,
     815             :                 rur->hint);
     816           0 :     cleanup_and_next (au);
     817           0 :     return;
     818             :   }
     819             : 
     820             :   /* Note that here we are in an open transaction that fetched
     821             :      (or updated) the current set of legitimization rules. So
     822             :      we must properly commit at the end! */
     823             :   {
     824             :     struct TALER_Amount next_threshold;
     825             : 
     826           2 :     qs = TALER_KYCLOGIC_kyc_test_required (
     827             :       TALER_KYCLOGIC_KYC_TRIGGER_AGGREGATE,
     828             :       lrs,
     829             :       &return_relevant_amounts,
     830             :       (void *) au,
     831             :       &requirement,
     832             :       &next_threshold);
     833             :   }
     834           2 :   if (qs < 0)
     835             :   {
     836           0 :     TALER_KYCLOGIC_rules_free (lrs);
     837           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     838           0 :     cleanup_and_next (au);
     839           0 :     return;
     840             :   }
     841           2 :   if (NULL == requirement)
     842             :   {
     843           1 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     844             :                 "KYC check clear, proceeding with wire transfer\n");
     845           1 :     TALER_KYCLOGIC_rules_free (lrs);
     846           1 :     trigger_wire_transfer (au);
     847           1 :     return;
     848             :   }
     849           1 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     850             :               "KYC requirement for %s is %s\n",
     851             :               TALER_amount2s (&au->total_amount),
     852             :               TALER_KYCLOGIC_rule2s (requirement));
     853             :   {
     854             :     json_t *jrule;
     855             : 
     856           1 :     jrule = TALER_KYCLOGIC_rule_to_measures (requirement);
     857           1 :     qs = db_plugin->trigger_kyc_rule_for_account (
     858           1 :       db_plugin->cls,
     859             :       au->payto_uri,
     860           1 :       &au->h_normalized_payto,
     861             :       NULL,
     862           1 :       &au->merchant_pub,
     863             :       jrule,
     864             :       TALER_KYCLOGIC_rule2priority (requirement),
     865             :       &au->requirement_row,
     866             :       &au->bad_kyc_auth);
     867           1 :     json_decref (jrule);
     868             :   }
     869           1 :   if (qs < 0)
     870             :   {
     871           0 :     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     872             :                 "Failed to persist KYC requirement `%s' in DB!\n",
     873             :                 TALER_KYCLOGIC_rule2s (requirement));
     874           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     875           0 :     if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     876           0 :       global_ret = EXIT_FAILURE;
     877           0 :     cleanup_and_next (au);
     878           0 :     return;
     879             :   }
     880           1 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     881             :               "Legitimization process %llu started\n",
     882             :               (unsigned long long) au->requirement_row);
     883           1 :   TALER_KYCLOGIC_rules_free (lrs);
     884             : 
     885           1 :   qs = db_plugin->update_aggregation_transient (db_plugin->cls,
     886           1 :                                                 &au->h_full_payto,
     887           1 :                                                 &au->wtid,
     888             :                                                 au->requirement_row,
     889           1 :                                                 &au->total_amount);
     890             : 
     891             : 
     892           1 :   if (qs < 0)
     893             :   {
     894           0 :     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     895             :                 "Failed to persist updated transient in in DB!\n");
     896           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     897           0 :     if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     898           0 :       global_ret = EXIT_FAILURE;
     899           0 :     cleanup_and_next (au);
     900           0 :     return;
     901             :   }
     902             : 
     903             :   {
     904           1 :     struct TALER_CoinDepositEventP rep = {
     905           1 :       .header.size = htons (sizeof (rep)),
     906           1 :       .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED),
     907             :       .merchant_pub = au->merchant_pub
     908             :     };
     909             : 
     910           1 :     db_plugin->event_notify (db_plugin->cls,
     911             :                              &rep.header,
     912             :                              NULL,
     913             :                              0);
     914             :   }
     915             : 
     916             :   /* First commit, turns the rollback in cleanup into a NOP! */
     917           1 :   commit_or_warn ();
     918           1 :   cleanup_and_next (au);
     919             : }
     920             : 
     921             : 
     922             : /**
     923             :  * The aggregation process could not be concluded and its progress state
     924             :  * should be remembered in a transient aggregation.
     925             :  *
     926             :  * @param[in] au aggregation that needs to be committed
     927             :  *     into a transient aggregation
     928             :  */
     929             : static void
     930           8 : commit_to_transient (struct AggregationUnit *au)
     931             : {
     932             :   enum GNUNET_DB_QueryStatus qs;
     933             : 
     934           8 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     935             :               "Not ready for wire transfer (%s)\n",
     936             :               TALER_amount2s (&au->final_amount));
     937           8 :   if (au->have_transient)
     938           3 :     qs = db_plugin->update_aggregation_transient (db_plugin->cls,
     939           3 :                                                   &au->h_full_payto,
     940           3 :                                                   &au->wtid,
     941             :                                                   au->requirement_row,
     942           3 :                                                   &au->total_amount);
     943             :   else
     944           5 :     qs = db_plugin->create_aggregation_transient (db_plugin->cls,
     945           5 :                                                   &au->h_full_payto,
     946           5 :                                                   au->wa->section_name,
     947           5 :                                                   &au->merchant_pub,
     948           5 :                                                   &au->wtid,
     949             :                                                   au->requirement_row,
     950           5 :                                                   &au->total_amount);
     951           8 :   if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
     952             :   {
     953           0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     954             :                 "Serialization issue, trying again later!\n");
     955           0 :     rollback_aggregation (au);
     956           0 :     return;
     957             :   }
     958           8 :   if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     959             :   {
     960           0 :     GNUNET_break (0);
     961           0 :     fail_aggregation (au);
     962           0 :     return;
     963             :   }
     964           8 :   au->have_transient = true;
     965             :   /* commit */
     966           8 :   commit_aggregation (au);
     967             : }
     968             : 
     969             : 
     970             : /**
     971             :  * Test if legitimization rules are satisfied for a transfer to @a h_payto.
     972             :  *
     973             :  * @param[in] au aggregation unit to check for
     974             :  */
     975             : static void
     976          62 : check_legitimization_satisfied (struct AggregationUnit *au)
     977             : {
     978          62 :   if (kyc_off)
     979             :   {
     980          60 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     981             :                 "KYC checks are off, legitimization satisfied\n");
     982          60 :     trigger_wire_transfer (au);
     983          60 :     return;
     984             :   }
     985             :   /* get legi rules *after* committing, as the legi check
     986             :      should run in a separate transaction! */
     987           2 :   au->legi_check = true;
     988           2 :   commit_to_transient (au);
     989             : }
     990             : 
     991             : 
     992             : /**
     993             :  * Perform the main aggregation work for @a au.  Expects to be in
     994             :  * a working transaction, which the caller must also ultimately commit
     995             :  * (or rollback) depending on our return value.
     996             :  *
     997             :  * @param[in,out] au aggregation unit to work on
     998             :  */
     999             : static void
    1000          68 : do_aggregate (struct AggregationUnit *au)
    1001             : {
    1002             :   enum GNUNET_DB_QueryStatus qs;
    1003             : 
    1004          68 :   au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (
    1005             :     au->payto_uri);
    1006          68 :   if (NULL == au->wa)
    1007             :   {
    1008           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1009             :                 "No exchange account configured for `%s', please fix your setup to continue!\n",
    1010             :                 au->payto_uri.full_payto);
    1011           0 :     global_ret = EXIT_FAILURE;
    1012           0 :     fail_aggregation (au);
    1013           0 :     return;
    1014             :   }
    1015             : 
    1016             :   {
    1017             :     struct GNUNET_TIME_Timestamp start_date;
    1018             :     struct GNUNET_TIME_Timestamp end_date;
    1019             :     struct TALER_MasterSignatureP master_sig;
    1020             :     uint64_t rowid;
    1021             : 
    1022          68 :     qs = db_plugin->get_wire_fee (db_plugin->cls,
    1023          68 :                                   au->wa->method,
    1024             :                                   au->execution_time,
    1025             :                                   &rowid,
    1026             :                                   &start_date,
    1027             :                                   &end_date,
    1028             :                                   &au->fees,
    1029             :                                   &master_sig);
    1030          68 :     if (0 >= qs)
    1031             :     {
    1032           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1033             :                   "Could not get wire fees for %s at %s. Aborting run.\n",
    1034             :                   au->wa->method,
    1035             :                   GNUNET_TIME_timestamp2s (au->execution_time));
    1036           0 :       fail_aggregation (au);
    1037           0 :       return;
    1038             :     }
    1039             :   }
    1040             : 
    1041             :   /* Now try to find other deposits to aggregate */
    1042          68 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1043             :               "Found ready deposit for %s, aggregating by target %s\n",
    1044             :               TALER_B2S (&au->merchant_pub),
    1045             :               au->payto_uri.full_payto);
    1046          68 :   qs = db_plugin->select_aggregation_transient (db_plugin->cls,
    1047          68 :                                                 &au->h_full_payto,
    1048          68 :                                                 &au->merchant_pub,
    1049          68 :                                                 au->wa->section_name,
    1050             :                                                 &au->wtid,
    1051             :                                                 &au->trans);
    1052          68 :   switch (qs)
    1053             :   {
    1054           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
    1055           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1056             :                 "Failed to lookup transient aggregates!\n");
    1057           0 :     fail_aggregation (au);
    1058           0 :     return;
    1059           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
    1060             :     /* serializiability issue, try again */
    1061           0 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1062             :                 "Serialization issue, trying again later!\n");
    1063           0 :     rollback_aggregation (au);
    1064           0 :     return;
    1065          63 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    1066          63 :     GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
    1067          63 :                                 &au->wtid,
    1068             :                                 sizeof (au->wtid));
    1069          63 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1070             :                 "No transient aggregation found, starting %s\n",
    1071             :                 TALER_B2S (&au->wtid));
    1072          63 :     au->have_transient = false;
    1073          63 :     break;
    1074           5 :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    1075           5 :     au->have_transient = true;
    1076           5 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1077             :                 "Transient aggregation found, resuming %s\n",
    1078             :                 TALER_B2S (&au->wtid));
    1079           5 :     break;
    1080             :   }
    1081          68 :   qs = db_plugin->aggregate (db_plugin->cls,
    1082          68 :                              &au->h_full_payto,
    1083          68 :                              &au->merchant_pub,
    1084          68 :                              &au->wtid,
    1085             :                              &au->total_amount);
    1086          68 :   if (GNUNET_DB_STATUS_HARD_ERROR == qs)
    1087             :   {
    1088           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1089             :                 "Failed to execute aggregation!\n");
    1090           0 :     fail_aggregation (au);
    1091           0 :     return;
    1092             :   }
    1093          68 :   if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    1094             :   {
    1095             :     /* serializiability issue, try again */
    1096           0 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1097             :                 "Serialization issue, trying again later!\n");
    1098           0 :     rollback_aggregation (au);
    1099           0 :     return;
    1100             :   }
    1101          68 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1102             :               "Aggregation total is %s.\n",
    1103             :               TALER_amount2s (&au->total_amount));
    1104             :   /* Subtract wire transfer fee and round to the unit supported by the
    1105             :      wire transfer method; Check if after rounding down, we still have
    1106             :      an amount to transfer, and if not mark as 'tiny'. */
    1107          68 :   if (au->have_transient)
    1108           5 :     GNUNET_assert (0 <=
    1109             :                    TALER_amount_add (&au->total_amount,
    1110             :                                      &au->total_amount,
    1111             :                                      &au->trans));
    1112             : 
    1113             : 
    1114          68 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1115             :               "Rounding aggregate of %s\n",
    1116             :               TALER_amount2s (&au->total_amount));
    1117          68 :   if ( (0 >=
    1118          68 :         TALER_amount_subtract (&au->final_amount,
    1119          68 :                                &au->total_amount,
    1120         130 :                                &au->fees.wire)) ||
    1121             :        (GNUNET_SYSERR ==
    1122          62 :         TALER_amount_round_down (&au->final_amount,
    1123          62 :                                  &currency_round_unit)) ||
    1124          62 :        (TALER_amount_is_zero (&au->final_amount)) )
    1125             :   {
    1126           6 :     commit_to_transient (au);
    1127           6 :     return;
    1128             :   }
    1129          62 :   check_legitimization_satisfied (au);
    1130             : }
    1131             : 
    1132             : 
    1133             : static void
    1134         121 : run_aggregation (void *cls)
    1135             : {
    1136         121 :   struct Shard *s = cls;
    1137             :   struct AggregationUnit *au;
    1138             :   enum GNUNET_DB_QueryStatus qs;
    1139             : 
    1140         121 :   task = NULL;
    1141         121 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1142             :               "Checking for ready deposits to aggregate\n");
    1143             :   /* make sure we have current fees */
    1144         121 :   au = GNUNET_new (struct AggregationUnit);
    1145         121 :   au->execution_time = GNUNET_TIME_timestamp_get ();
    1146         121 :   au->shard = s;
    1147         121 :   if (GNUNET_OK !=
    1148         121 :       db_plugin->start_deferred_wire_out (db_plugin->cls))
    1149             :   {
    1150           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1151             :                 "Failed to start database transaction!\n");
    1152           0 :     global_ret = EXIT_FAILURE;
    1153           0 :     GNUNET_SCHEDULER_shutdown ();
    1154           0 :     release_shard (s);
    1155           0 :     return;
    1156             :   }
    1157         121 :   qs = db_plugin->get_ready_deposit (
    1158         121 :     db_plugin->cls,
    1159         121 :     s->shard_start,
    1160         121 :     s->shard_end,
    1161             :     &au->merchant_pub,
    1162             :     &au->payto_uri);
    1163         121 :   switch (qs)
    1164             :   {
    1165           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
    1166           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1167             :                 "Failed to begin deposit iteration!\n");
    1168           0 :     global_ret = EXIT_FAILURE;
    1169           0 :     GNUNET_SCHEDULER_shutdown ();
    1170           0 :     cleanup_and_next (au);
    1171           0 :     return;
    1172           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
    1173           0 :     cleanup_au (au);
    1174           0 :     db_plugin->rollback (db_plugin->cls);
    1175           0 :     run_task_with_shard (s);
    1176           0 :     return;
    1177          54 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    1178             :     {
    1179             :       struct GNUNET_TIME_Relative duration
    1180          54 :         = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time);
    1181             : 
    1182          54 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1183             :                   "Completed shard [%u,%u] after %s with %llu deposits\n",
    1184             :                   (unsigned int) s->shard_start,
    1185             :                   (unsigned int) s->shard_end,
    1186             :                   GNUNET_TIME_relative2s (duration,
    1187             :                                           true),
    1188             :                   (unsigned long long) s->work_counter);
    1189          54 :       cleanup_and_next (au);
    1190          54 :       return;
    1191             :     }
    1192          67 :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    1193          67 :     s->work_counter++;
    1194          67 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1195             :                 "Found ready deposit!\n");
    1196             :     /* continued below */
    1197          67 :     break;
    1198             :   }
    1199             : 
    1200          67 :   TALER_full_payto_hash (au->payto_uri,
    1201             :                          &au->h_full_payto);
    1202          67 :   TALER_full_payto_normalize_and_hash (au->payto_uri,
    1203             :                                        &au->h_normalized_payto);
    1204          67 :   GNUNET_break (! TALER_payto_is_wallet (au->payto_uri.full_payto));
    1205          67 :   do_aggregate (au);
    1206             : }
    1207             : 
    1208             : 
    1209             : /**
    1210             :  * Select a shard to work on.
    1211             :  *
    1212             :  * @param cls NULL
    1213             :  */
    1214             : static void
    1215         121 : run_shard (void *cls)
    1216             : {
    1217             :   struct Shard *s;
    1218             :   enum GNUNET_DB_QueryStatus qs;
    1219             : 
    1220             :   (void) cls;
    1221         121 :   task = NULL;
    1222         121 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1223             :               "Running aggregation shard\n");
    1224         121 :   if (GNUNET_SYSERR ==
    1225         121 :       db_plugin->preflight (db_plugin->cls))
    1226             :   {
    1227           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1228             :                 "Failed to obtain database connection!\n");
    1229           0 :     global_ret = EXIT_FAILURE;
    1230           0 :     GNUNET_SCHEDULER_shutdown ();
    1231           0 :     return;
    1232             :   }
    1233         121 :   s = GNUNET_new (struct Shard);
    1234         121 :   s->start_time = GNUNET_TIME_timestamp_get ();
    1235         121 :   qs = db_plugin->begin_revolving_shard (db_plugin->cls,
    1236             :                                          "aggregator",
    1237             :                                          shard_size,
    1238             :                                          1U + INT32_MAX,
    1239             :                                          &s->shard_start,
    1240             :                                          &s->shard_end);
    1241         121 :   if (0 >= qs)
    1242             :   {
    1243           0 :     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    1244             :     {
    1245             :       static struct GNUNET_TIME_Relative delay;
    1246             : 
    1247           0 :       GNUNET_free (s);
    1248           0 :       delay = GNUNET_TIME_randomized_backoff (delay,
    1249             :                                               GNUNET_TIME_UNIT_SECONDS);
    1250           0 :       GNUNET_assert (NULL == task);
    1251           0 :       task = GNUNET_SCHEDULER_add_delayed (delay,
    1252             :                                            &run_shard,
    1253             :                                            NULL);
    1254           0 :       return;
    1255             :     }
    1256           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1257             :                 "Failed to begin shard (%d)!\n",
    1258             :                 qs);
    1259           0 :     GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs);
    1260           0 :     global_ret = EXIT_FAILURE;
    1261           0 :     GNUNET_SCHEDULER_shutdown ();
    1262           0 :     return;
    1263             :   }
    1264         121 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1265             :               "Starting shard [%u:%u]!\n",
    1266             :               (unsigned int) s->shard_start,
    1267             :               (unsigned int) s->shard_end);
    1268         121 :   GNUNET_assert (NULL == task);
    1269         121 :   task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1270             :                                    s);
    1271             : }
    1272             : 
    1273             : 
    1274             : static void
    1275         122 : drain_kyc_alerts (void *cls)
    1276             : {
    1277             :   enum GNUNET_DB_QueryStatus qs;
    1278             :   struct AggregationUnit *au;
    1279             : 
    1280             :   (void) cls;
    1281         122 :   task = NULL;
    1282         122 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1283             :               "Draining KYC alerts\n");
    1284         122 :   au = GNUNET_new (struct AggregationUnit);
    1285         122 :   au->execution_time = GNUNET_TIME_timestamp_get ();
    1286         122 :   if (GNUNET_SYSERR ==
    1287         122 :       db_plugin->preflight (db_plugin->cls))
    1288             :   {
    1289           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1290             :                 "Failed to obtain database connection!\n");
    1291           0 :     global_ret = EXIT_FAILURE;
    1292           0 :     GNUNET_SCHEDULER_shutdown ();
    1293           0 :     return;
    1294             :   }
    1295         122 :   if (GNUNET_OK !=
    1296         122 :       db_plugin->start (db_plugin->cls,
    1297             :                         "handle kyc alerts"))
    1298             :   {
    1299           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1300             :                 "Failed to start database transaction!\n");
    1301           0 :     global_ret = EXIT_FAILURE;
    1302           0 :     GNUNET_SCHEDULER_shutdown ();
    1303           0 :     return;
    1304             :   }
    1305             :   while (1)
    1306             :   {
    1307         122 :     qs = db_plugin->drain_kyc_alert (db_plugin->cls,
    1308             :                                      1,
    1309             :                                      &au->h_normalized_payto);
    1310         122 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1311             :                 "Found %d KYC alerts\n",
    1312             :                 (int) qs);
    1313         122 :     switch (qs)
    1314             :     {
    1315           0 :     case GNUNET_DB_STATUS_HARD_ERROR:
    1316           0 :       GNUNET_break (0);
    1317           0 :       db_plugin->rollback (db_plugin->cls);
    1318           0 :       GNUNET_free (au);
    1319           0 :       GNUNET_assert (NULL == task);
    1320           0 :       global_ret = EXIT_FAILURE;
    1321           0 :       GNUNET_SCHEDULER_shutdown ();
    1322           0 :       return;
    1323           0 :     case GNUNET_DB_STATUS_SOFT_ERROR:
    1324           0 :       db_plugin->rollback (db_plugin->cls);
    1325           0 :       GNUNET_assert (NULL == task);
    1326           0 :       GNUNET_free (au);
    1327           0 :       task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
    1328             :                                        NULL);
    1329           0 :       return;
    1330         121 :     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    1331         121 :       GNUNET_free (au);
    1332         121 :       db_plugin->rollback (db_plugin->cls);
    1333         121 :       GNUNET_assert (NULL == task);
    1334         121 :       task = GNUNET_SCHEDULER_add_now (&run_shard,
    1335             :                                        NULL);
    1336         121 :       return;
    1337           1 :     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    1338             :       /* handled below */
    1339           1 :       break;
    1340             :     }
    1341           1 :     qs = db_plugin->find_aggregation_transient (
    1342           1 :       db_plugin->cls,
    1343           1 :       &au->h_normalized_payto,
    1344             :       &au->payto_uri,
    1345             :       &au->wtid,
    1346             :       &au->merchant_pub,
    1347             :       &au->trans);
    1348           1 :     switch (qs)
    1349             :     {
    1350           0 :     case GNUNET_DB_STATUS_HARD_ERROR:
    1351           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1352             :                   "Failed to lookup transient aggregates!\n");
    1353           0 :       db_plugin->rollback (db_plugin->cls);
    1354           0 :       GNUNET_assert (NULL == task);
    1355           0 :       task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
    1356             :                                        NULL);
    1357           0 :       return;
    1358           0 :     case GNUNET_DB_STATUS_SOFT_ERROR:
    1359             :       /* serializiability issue, try again */
    1360           0 :       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1361             :                   "Serialization issue, trying again later!\n");
    1362           0 :       db_plugin->rollback (db_plugin->cls);
    1363           0 :       GNUNET_assert (NULL == task);
    1364           0 :       task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
    1365             :                                        NULL);
    1366           0 :       return;
    1367           0 :     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    1368           0 :       continue; /* while (1) */
    1369           1 :     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    1370           1 :       TALER_full_payto_hash (au->payto_uri,
    1371             :                              &au->h_full_payto);
    1372           1 :       au->have_transient = true;
    1373           1 :       do_aggregate (au);
    1374           1 :       return;
    1375             :     }
    1376           0 :     GNUNET_assert (0);
    1377             :   } /* while(1) */
    1378             : }
    1379             : 
    1380             : 
    1381             : /**
    1382             :  * First task.
    1383             :  *
    1384             :  * @param cls closure, NULL
    1385             :  * @param args remaining command-line arguments
    1386             :  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
    1387             :  * @param c configuration
    1388             :  */
    1389             : static void
    1390          55 : run (void *cls,
    1391             :      char *const *args,
    1392             :      const char *cfgfile,
    1393             :      const struct GNUNET_CONFIGURATION_Handle *c)
    1394             : {
    1395             :   unsigned long long ass;
    1396             :   (void) cls;
    1397             :   (void) args;
    1398             :   (void) cfgfile;
    1399             : 
    1400          55 :   cfg = c;
    1401          55 :   if (GNUNET_OK !=
    1402          55 :       parse_aggregator_config ())
    1403             :   {
    1404           0 :     cfg = NULL;
    1405           0 :     global_ret = EXIT_NOTCONFIGURED;
    1406           0 :     return;
    1407             :   }
    1408          55 :   if (GNUNET_OK !=
    1409          55 :       GNUNET_CONFIGURATION_get_value_number (cfg,
    1410             :                                              "exchange",
    1411             :                                              "AGGREGATOR_SHARD_SIZE",
    1412             :                                              &ass))
    1413             :   {
    1414           0 :     cfg = NULL;
    1415           0 :     global_ret = EXIT_NOTCONFIGURED;
    1416           0 :     return;
    1417             :   }
    1418          55 :   if ( (0 == ass) ||
    1419          55 :        (ass > INT32_MAX) )
    1420          55 :     shard_size = 1U + INT32_MAX;
    1421             :   else
    1422           0 :     shard_size = (uint32_t) ass;
    1423          55 :   if (GNUNET_OK !=
    1424          55 :       TALER_KYCLOGIC_kyc_init (cfg,
    1425             :                                cfgfile))
    1426             :   {
    1427           0 :     cfg = NULL;
    1428           0 :     global_ret = EXIT_NOTCONFIGURED;
    1429           0 :     return;
    1430             :   }
    1431          55 :   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
    1432             :                                  NULL);
    1433          55 :   GNUNET_assert (NULL == task);
    1434          55 :   task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
    1435             :                                    NULL);
    1436             : }
    1437             : 
    1438             : 
    1439             : /**
    1440             :  * The main function of the taler-exchange-aggregator.
    1441             :  *
    1442             :  * @param argc number of arguments from the command line
    1443             :  * @param argv command line arguments
    1444             :  * @return 0 ok, non-zero on error, see #global_ret
    1445             :  */
    1446             : int
    1447          55 : main (int argc,
    1448             :       char *const *argv)
    1449             : {
    1450          55 :   struct GNUNET_GETOPT_CommandLineOption options[] = {
    1451          55 :     GNUNET_GETOPT_option_timetravel ('T',
    1452             :                                      "timetravel"),
    1453          55 :     GNUNET_GETOPT_option_flag ('t',
    1454             :                                "test",
    1455             :                                "run in test mode and exit when idle",
    1456             :                                &test_mode),
    1457          55 :     GNUNET_GETOPT_option_flag ('y',
    1458             :                                "kyc-off",
    1459             :                                "perform wire transfers without KYC checks",
    1460             :                                &kyc_off),
    1461             :     GNUNET_GETOPT_OPTION_END
    1462             :   };
    1463             :   enum GNUNET_GenericReturnValue ret;
    1464             : 
    1465          55 :   ret = GNUNET_PROGRAM_run (
    1466             :     TALER_EXCHANGE_project_data (),
    1467             :     argc, argv,
    1468             :     "taler-exchange-aggregator",
    1469             :     gettext_noop (
    1470             :       "background process that aggregates and executes wire transfers"),
    1471             :     options,
    1472             :     &run, NULL);
    1473          55 :   if (GNUNET_SYSERR == ret)
    1474           0 :     return EXIT_INVALIDARGUMENT;
    1475          55 :   if (GNUNET_NO == ret)
    1476           0 :     return EXIT_SUCCESS;
    1477          55 :   return global_ret;
    1478             : }
    1479             : 
    1480             : 
    1481             : /* end of taler-exchange-aggregator.c */

Generated by: LCOV version 1.16