LCOV - code coverage report
Current view: top level - exchange - taler-exchange-aggregator.c (source / functions) Hit Total Coverage
Test: GNU Taler exchange coverage report Lines: 196 323 60.7 %
Date: 2021-08-30 06:43:37 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   This file is part of TALER
       3             :   Copyright (C) 2016-2021 Taler Systems SA
       4             : 
       5             :   TALER is free software; you can redistribute it and/or modify it under the
       6             :   terms of the GNU Affero General Public License as published by the Free Software
       7             :   Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
      12             : 
      13             :   You should have received a copy of the GNU Affero General Public License along with
      14             :   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             : */
      16             : 
      17             : /**
      18             :  * @file taler-exchange-aggregator.c
      19             :  * @brief Process that aggregates outgoing transactions and prepares their execution
      20             :  * @author Christian Grothoff
      21             :  */
      22             : #include "platform.h"
      23             : #include <gnunet/gnunet_util_lib.h>
      24             : #include <jansson.h>
      25             : #include <pthread.h>
      26             : #include "taler_exchangedb_lib.h"
      27             : #include "taler_exchangedb_plugin.h"
      28             : #include "taler_json_lib.h"
      29             : #include "taler_bank_service.h"
      30             : 
      31             : 
      32             : /**
      33             :  * Information about one aggregation process to be executed.  There is
      34             :  * at most one of these around at any given point in time.
      35             :  * Note that this limits parallelism, and we might want
      36             :  * to revise this decision at a later point.
      37             :  */
      38             : struct AggregationUnit
      39             : {
      40             :   /**
      41             :    * Public key of the merchant.
      42             :    */
      43             :   struct TALER_MerchantPublicKeyP merchant_pub;
      44             : 
      45             :   /**
      46             :    * Total amount to be transferred, before subtraction of @e wire_fee and rounding down.
      47             :    */
      48             :   struct TALER_Amount total_amount;
      49             : 
      50             :   /**
      51             :    * Final amount to be transferred (after fee and rounding down).
      52             :    */
      53             :   struct TALER_Amount final_amount;
      54             : 
      55             :   /**
      56             :    * Wire fee we charge for @e wp at @e execution_time.
      57             :    */
      58             :   struct TALER_Amount wire_fee;
      59             : 
      60             :   /**
      61             :    * Hash of @e wire.
      62             :    */
      63             :   struct GNUNET_HashCode h_wire;
      64             : 
      65             :   /**
      66             :    * Wire transfer identifier we use.
      67             :    */
      68             :   struct TALER_WireTransferIdentifierRawP wtid;
      69             : 
      70             :   /**
      71             :    * Row ID of the transaction that started it all.
      72             :    */
      73             :   uint64_t row_id;
      74             : 
      75             :   /**
      76             :    * The current time (which triggered the aggregation and
      77             :    * defines the wire fee).
      78             :    */
      79             :   struct GNUNET_TIME_Absolute execution_time;
      80             : 
      81             :   /**
      82             :    * Wire details of the merchant.
      83             :    */
      84             :   json_t *wire;
      85             : 
      86             :   /**
      87             :    * Exchange wire account to be used for the preparation and
      88             :    * eventual execution of the aggregate wire transfer.
      89             :    */
      90             :   const struct TALER_EXCHANGEDB_AccountInfo *wa;
      91             : 
      92             :   /**
      93             :    * Array of row_ids from the aggregation.
      94             :    */
      95             :   uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
      96             : 
      97             :   /**
      98             :    * Offset specifying how many @e additional_rows are in use.
      99             :    */
     100             :   unsigned int rows_offset;
     101             : 
     102             :   /**
     103             :    * Set to #GNUNET_YES if we have to abort due to failure.
     104             :    */
     105             :   int failed;
     106             : 
     107             :   /**
     108             :    * Set to #GNUNET_YES if we encountered a refund during #refund_by_coin_cb.
     109             :    * Used to wave the deposit fee.
     110             :    */
     111             :   int have_refund;
     112             : };
     113             : 
     114             : 
     115             : /**
     116             :  * What is the smallest unit we support for wire transfers?
     117             :  * We will need to round down to a multiple of this amount.
     118             :  */
     119             : static struct TALER_Amount currency_round_unit;
     120             : 
     121             : /**
     122             :  * What is the base URL of this exchange?  Used in the
     123             :  * wire transfer subjects to that merchants and governments
     124             :  * can ask for the list of aggregated deposits.
     125             :  */
     126             : static char *exchange_base_url;
     127             : 
     128             : /**
     129             :  * The exchange's configuration.
     130             :  */
     131             : static const struct GNUNET_CONFIGURATION_Handle *cfg;
     132             : 
     133             : /**
     134             :  * Our database plugin.
     135             :  */
     136             : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
     137             : 
     138             : /**
     139             :  * Next task to run, if any.
     140             :  */
     141             : static struct GNUNET_SCHEDULER_Task *task;
     142             : 
     143             : /**
     144             :  * How long should we sleep when idle before trying to find more work?
     145             :  */
     146             : static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
     147             : 
     148             : /**
     149             :  * Value to return from main(). 0 on success, non-zero on errors.
     150             :  */
     151             : static int global_ret;
     152             : 
     153             : /**
     154             :  * #GNUNET_YES if we are in test mode and should exit when idle.
     155             :  */
     156             : static int test_mode;
     157             : 
     158             : 
     159             : /**
     160             :  * Main work function that queries the DB and aggregates transactions
     161             :  * into larger wire transfers.
     162             :  *
     163             :  * @param cls NULL
     164             :  */
     165             : static void
     166             : run_aggregation (void *cls);
     167             : 
     168             : 
     169             : /**
     170             :  * Free data stored in @a au, but not @a au itself (stack allocated).
     171             :  *
     172             :  * @param au aggregation unit to clean up
     173             :  */
     174             : static void
     175          97 : cleanup_au (struct AggregationUnit *au)
     176             : {
     177          97 :   GNUNET_assert (NULL != au);
     178          97 :   if (NULL != au->wire)
     179          50 :     json_decref (au->wire);
     180          97 :   memset (au,
     181             :           0,
     182             :           sizeof (*au));
     183          97 : }
     184             : 
     185             : 
     186             : /**
     187             :  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
     188             :  *
     189             :  * @param cls closure
     190             :  */
     191             : static void
     192          47 : shutdown_task (void *cls)
     193             : {
     194             :   (void) cls;
     195          47 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     196             :               "Running shutdown\n");
     197          47 :   if (NULL != task)
     198             :   {
     199           0 :     GNUNET_SCHEDULER_cancel (task);
     200           0 :     task = NULL;
     201             :   }
     202          47 :   TALER_EXCHANGEDB_plugin_unload (db_plugin);
     203          47 :   db_plugin = NULL;
     204          47 :   TALER_EXCHANGEDB_unload_accounts ();
     205          47 :   cfg = NULL;
     206          47 : }
     207             : 
     208             : 
     209             : /**
     210             :  * Parse the configuration for wirewatch.
     211             :  *
     212             :  * @return #GNUNET_OK on success
     213             :  */
     214             : static int
     215          47 : parse_wirewatch_config (void)
     216             : {
     217          47 :   if (GNUNET_OK !=
     218          47 :       GNUNET_CONFIGURATION_get_value_string (cfg,
     219             :                                              "exchange",
     220             :                                              "BASE_URL",
     221             :                                              &exchange_base_url))
     222             :   {
     223           0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     224             :                                "exchange",
     225             :                                "BASE_URL");
     226           0 :     return GNUNET_SYSERR;
     227             :   }
     228          47 :   if (GNUNET_OK !=
     229          47 :       GNUNET_CONFIGURATION_get_value_time (cfg,
     230             :                                            "exchange",
     231             :                                            "AGGREGATOR_IDLE_SLEEP_INTERVAL",
     232             :                                            &aggregator_idle_sleep_interval))
     233             :   {
     234           0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     235             :                                "exchange",
     236             :                                "AGGREGATOR_IDLE_SLEEP_INTERVAL");
     237           0 :     return GNUNET_SYSERR;
     238             :   }
     239          47 :   if ( (GNUNET_OK !=
     240          47 :         TALER_config_get_amount (cfg,
     241             :                                  "taler",
     242             :                                  "CURRENCY_ROUND_UNIT",
     243          47 :                                  &currency_round_unit)) ||
     244          47 :        ( (0 != currency_round_unit.fraction) &&
     245          47 :          (0 != currency_round_unit.value) ) )
     246             :   {
     247           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     248             :                 "Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n");
     249           0 :     return GNUNET_SYSERR;
     250             :   }
     251             : 
     252          47 :   if (NULL ==
     253          47 :       (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
     254             :   {
     255           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     256             :                 "Failed to initialize DB subsystem\n");
     257           0 :     return GNUNET_SYSERR;
     258             :   }
     259          47 :   if (GNUNET_OK !=
     260          47 :       TALER_EXCHANGEDB_load_accounts (cfg,
     261             :                                       TALER_EXCHANGEDB_ALO_DEBIT))
     262             :   {
     263           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     264             :                 "No wire accounts configured for debit!\n");
     265           0 :     TALER_EXCHANGEDB_plugin_unload (db_plugin);
     266           0 :     db_plugin = NULL;
     267           0 :     return GNUNET_SYSERR;
     268             :   }
     269          47 :   return GNUNET_OK;
     270             : }
     271             : 
     272             : 
     273             : /**
     274             :  * Callback invoked with information about refunds applicable
     275             :  * to a particular coin.  Subtract refunded amount(s) from
     276             :  * the aggregation unit's total amount.
     277             :  *
     278             :  * @param cls closure with a `struct AggregationUnit *`
     279             :  * @param amount_with_fee what was the refunded amount with the fee
     280             :  * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
     281             :  */
     282             : static int
     283          12 : refund_by_coin_cb (void *cls,
     284             :                    const struct TALER_Amount *amount_with_fee)
     285             : {
     286          12 :   struct AggregationUnit *aux = cls;
     287             : 
     288          12 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     289             :               "Aggregator subtracts applicable refund of amount %s\n",
     290             :               TALER_amount2s (amount_with_fee));
     291          12 :   aux->have_refund = GNUNET_YES;
     292          12 :   if (0 >
     293          12 :       TALER_amount_subtract (&aux->total_amount,
     294          12 :                              &aux->total_amount,
     295             :                              amount_with_fee))
     296             :   {
     297           0 :     GNUNET_break (0);
     298           0 :     return GNUNET_SYSERR;
     299             :   }
     300          12 :   return GNUNET_OK;
     301             : }
     302             : 
     303             : 
     304             : /**
     305             :  * Function called with details about deposits that have been made,
     306             :  * with the goal of executing the corresponding wire transaction.
     307             :  *
     308             :  * @param cls a `struct AggregationUnit`
     309             :  * @param row_id identifies database entry
     310             :  * @param exchange_timestamp when did the deposit happen
     311             :  * @param wallet_timestamp when did the contract happen
     312             :  * @param merchant_pub public key of the merchant
     313             :  * @param coin_pub public key of the coin
     314             :  * @param amount_with_fee amount that was deposited including fee
     315             :  * @param deposit_fee amount the exchange gets to keep as transaction fees
     316             :  * @param h_contract_terms hash of the proposal data known to merchant and customer
     317             :  * @param wire_deadline by which the merchant advised that he would like the
     318             :  *        wire transfer to be executed
     319             :  * @param wire wire details for the merchant
     320             :  * @return transaction status code,  #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
     321             :  */
     322             : static enum GNUNET_DB_QueryStatus
     323          50 : deposit_cb (void *cls,
     324             :             uint64_t row_id,
     325             :             struct GNUNET_TIME_Absolute exchange_timestamp,
     326             :             struct GNUNET_TIME_Absolute wallet_timestamp,
     327             :             const struct TALER_MerchantPublicKeyP *merchant_pub,
     328             :             const struct TALER_CoinSpendPublicKeyP *coin_pub,
     329             :             const struct TALER_Amount *amount_with_fee,
     330             :             const struct TALER_Amount *deposit_fee,
     331             :             const struct GNUNET_HashCode *h_contract_terms,
     332             :             struct GNUNET_TIME_Absolute wire_deadline,
     333             :             const json_t *wire)
     334             : {
     335          50 :   struct AggregationUnit *au = cls;
     336             :   enum GNUNET_DB_QueryStatus qs;
     337             : 
     338             :   (void) cls;
     339             :   /* NOTE: potential optimization: use custom SQL API to not
     340             :      fetch this one: */
     341             :   (void) wire_deadline; /* already checked by SQL query */
     342             :   (void) exchange_timestamp;
     343             :   (void) wallet_timestamp;
     344          50 :   au->merchant_pub = *merchant_pub;
     345          50 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     346             :               "Aggregator processing payment %s with amount %s\n",
     347             :               TALER_B2S (coin_pub),
     348             :               TALER_amount2s (amount_with_fee));
     349          50 :   au->row_id = row_id;
     350          50 :   au->total_amount = *amount_with_fee;
     351          50 :   au->have_refund = GNUNET_NO;
     352          50 :   qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
     353             :                                           coin_pub,
     354          50 :                                           &au->merchant_pub,
     355             :                                           h_contract_terms,
     356             :                                           &refund_by_coin_cb,
     357             :                                           au);
     358          50 :   if (0 > qs)
     359             :   {
     360           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     361           0 :     return qs;
     362             :   }
     363          50 :   if (GNUNET_NO == au->have_refund)
     364             :   {
     365             :     struct TALER_Amount ntotal;
     366             : 
     367          49 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     368             :                 "Non-refunded transaction, subtracting deposit fee %s\n",
     369             :                 TALER_amount2s (deposit_fee));
     370          49 :     if (0 >
     371          49 :         TALER_amount_subtract (&ntotal,
     372             :                                amount_with_fee,
     373             :                                deposit_fee))
     374             :     {
     375             :       /* This should never happen, issue a warning, but continue processing
     376             :          with an amount of zero, least we hang here for good. */
     377           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     378             :                   "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n",
     379             :                   (unsigned long long) row_id,
     380             :                   TALER_amount2s (amount_with_fee));
     381           0 :       GNUNET_assert (GNUNET_OK ==
     382             :                      TALER_amount_set_zero (au->total_amount.currency,
     383             :                                             &au->total_amount));
     384             :     }
     385             :     else
     386             :     {
     387          49 :       au->total_amount = ntotal;
     388             :     }
     389             :   }
     390             : 
     391          50 :   GNUNET_assert (NULL == au->wire);
     392          50 :   if (NULL == (au->wire = json_incref ((json_t *) wire)))
     393             :   {
     394           0 :     GNUNET_break (0);
     395           0 :     return GNUNET_DB_STATUS_HARD_ERROR;
     396             :   }
     397          50 :   if (GNUNET_OK !=
     398          50 :       TALER_JSON_merchant_wire_signature_hash (wire,
     399             :                                                &au->h_wire))
     400             :   {
     401           0 :     GNUNET_break (0);
     402           0 :     json_decref (au->wire);
     403           0 :     au->wire = NULL;
     404           0 :     return GNUNET_DB_STATUS_HARD_ERROR;
     405             :   }
     406          50 :   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
     407          50 :                               &au->wtid,
     408             :                               sizeof (au->wtid));
     409          50 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     410             :               "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n",
     411             :               TALER_B2S (&au->wtid),
     412             :               TALER_amount2s (amount_with_fee),
     413             :               (unsigned long long) row_id);
     414             :   {
     415             :     char *url;
     416             : 
     417          50 :     url = TALER_JSON_wire_to_payto (au->wire);
     418          50 :     au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url);
     419          50 :     if (NULL == au->wa)
     420             :     {
     421           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     422             :                   "No exchange account configured for `%s', please fix your setup to continue!\n",
     423             :                   url);
     424           0 :       GNUNET_free (url);
     425           0 :       return GNUNET_DB_STATUS_HARD_ERROR;
     426             :     }
     427          50 :     GNUNET_free (url);
     428             :   }
     429             : 
     430             :   /* make sure we have current fees */
     431          50 :   au->execution_time = GNUNET_TIME_absolute_get ();
     432          50 :   (void) GNUNET_TIME_round_abs (&au->execution_time);
     433             :   {
     434             :     struct TALER_Amount closing_fee;
     435             :     struct GNUNET_TIME_Absolute start_date;
     436             :     struct GNUNET_TIME_Absolute end_date;
     437             :     struct TALER_MasterSignatureP master_sig;
     438             :     enum GNUNET_DB_QueryStatus qs;
     439             : 
     440          50 :     qs = db_plugin->get_wire_fee (db_plugin->cls,
     441          50 :                                   au->wa->method,
     442             :                                   au->execution_time,
     443             :                                   &start_date,
     444             :                                   &end_date,
     445             :                                   &au->wire_fee,
     446             :                                   &closing_fee,
     447             :                                   &master_sig);
     448          50 :     if (0 >= qs)
     449             :     {
     450           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     451             :                   "Could not get wire fees for %s at %s. Aborting run.\n",
     452             :                   au->wa->method,
     453             :                   GNUNET_STRINGS_absolute_time_to_string (au->execution_time));
     454           0 :       return GNUNET_DB_STATUS_HARD_ERROR;
     455             :     }
     456             :   }
     457             : 
     458          50 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     459             :               "Aggregator starts aggregation for deposit %llu to %s with wire fee %s\n",
     460             :               (unsigned long long) row_id,
     461             :               TALER_B2S (&au->wtid),
     462             :               TALER_amount2s (&au->wire_fee));
     463          50 :   qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
     464          50 :                                                &au->wtid,
     465             :                                                row_id);
     466          50 :   if (qs <= 0)
     467             :   {
     468           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     469           0 :     return qs;
     470             :   }
     471          50 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     472             :               "Aggregator marks deposit %llu as done\n",
     473             :               (unsigned long long) row_id);
     474          50 :   qs = db_plugin->mark_deposit_done (db_plugin->cls,
     475             :                                      row_id);
     476          50 :   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
     477             :   {
     478           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     479           0 :     return qs;
     480             :   }
     481          50 :   return qs;
     482             : }
     483             : 
     484             : 
     485             : /**
     486             :  * Function called with details about another deposit we
     487             :  * can aggregate into an existing aggregation unit.
     488             :  *
     489             :  * @param cls a `struct AggregationUnit`
     490             :  * @param row_id identifies database entry
     491             :  * @param coin_pub public key of the coin
     492             :  * @param amount_with_fee amount that was deposited including fee
     493             :  * @param deposit_fee amount the exchange gets to keep as transaction fees
     494             :  * @param h_contract_terms hash of the proposal data known to merchant and customer
     495             :  * @return transaction status code
     496             :  */
     497             : static enum GNUNET_DB_QueryStatus
     498          42 : aggregate_cb (void *cls,
     499             :               uint64_t row_id,
     500             :               const struct TALER_CoinSpendPublicKeyP *coin_pub,
     501             :               const struct TALER_Amount *amount_with_fee,
     502             :               const struct TALER_Amount *deposit_fee,
     503             :               const struct GNUNET_HashCode *h_contract_terms)
     504             : {
     505          42 :   struct AggregationUnit *au = cls;
     506             :   struct TALER_Amount old;
     507             :   enum GNUNET_DB_QueryStatus qs;
     508             : 
     509          42 :   if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
     510             :   {
     511             :     /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */
     512           0 :     GNUNET_break (0);
     513             :     /* Skip this one, but keep going with the overall transaction */
     514           0 :     return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     515             :   }
     516             : 
     517             :   /* add to total */
     518          42 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     519             :               "Adding transaction amount %s from row %llu to aggregation\n",
     520             :               TALER_amount2s (amount_with_fee),
     521             :               (unsigned long long) row_id);
     522             :   /* save the existing total aggregate in 'old', for later */
     523          42 :   old = au->total_amount;
     524             :   /* we begin with the total contribution of the current coin */
     525          42 :   au->total_amount = *amount_with_fee;
     526             :   /* compute contribution of this coin (after fees) */
     527          42 :   au->have_refund = GNUNET_NO;
     528          42 :   qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
     529             :                                           coin_pub,
     530          42 :                                           &au->merchant_pub,
     531             :                                           h_contract_terms,
     532             :                                           &refund_by_coin_cb,
     533             :                                           au);
     534          42 :   if (0 > qs)
     535             :   {
     536           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     537           0 :     return qs;
     538             :   }
     539          42 :   if (GNUNET_NO == au->have_refund)
     540             :   {
     541             :     struct TALER_Amount tmp;
     542             : 
     543          31 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     544             :                 "Subtracting deposit fee %s for non-refunded coin\n",
     545             :                 TALER_amount2s (deposit_fee));
     546          31 :     if (0 >
     547          31 :         TALER_amount_subtract (&tmp,
     548          31 :                                &au->total_amount,
     549             :                                deposit_fee))
     550             :     {
     551           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     552             :                   "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n",
     553             :                   (unsigned long long) row_id,
     554             :                   TALER_amount2s (&au->total_amount));
     555           0 :       GNUNET_assert (GNUNET_OK ==
     556             :                      TALER_amount_set_zero (old.currency,
     557             :                                             &au->total_amount));
     558             :     }
     559             :     else
     560             :     {
     561          31 :       au->total_amount = tmp;
     562             :     }
     563             :   }
     564             : 
     565             :   /* now add the au->total_amount with the (remaining) contribution of
     566             :      the current coin to the 'old' value with the current aggregate value */
     567             :   {
     568             :     struct TALER_Amount tmp;
     569             : 
     570          42 :     if (0 >
     571          42 :         TALER_amount_add (&tmp,
     572          42 :                           &au->total_amount,
     573             :                           &old))
     574             :     {
     575           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     576             :                   "Overflow or currency incompatibility during aggregation at %llu\n",
     577             :                   (unsigned long long) row_id);
     578             :       /* Skip this one, but keep going! */
     579           0 :       au->total_amount = old;
     580           0 :       return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     581             :     }
     582          42 :     au->total_amount = tmp;
     583             :   }
     584             : 
     585             :   /* "append" to our list of rows */
     586          42 :   au->additional_rows[au->rows_offset++] = row_id;
     587             :   /* insert into aggregation tracking table */
     588          42 :   qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
     589          42 :                                                &au->wtid,
     590             :                                                row_id);
     591          42 :   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
     592             :   {
     593           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     594           0 :     return qs;
     595             :   }
     596          42 :   qs = db_plugin->mark_deposit_done (db_plugin->cls,
     597             :                                      row_id);
     598          42 :   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
     599             :   {
     600           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     601           0 :     return qs;
     602             :   }
     603          42 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     604             :               "Aggregator marked deposit %llu as DONE\n",
     605             :               (unsigned long long) row_id);
     606          42 :   return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     607             : }
     608             : 
     609             : 
     610             : /**
     611             :  * Perform a database commit. If it fails, print a warning.
     612             :  *
     613             :  * @return status of commit
     614             :  */
     615             : static enum GNUNET_DB_QueryStatus
     616          50 : commit_or_warn (void)
     617             : {
     618             :   enum GNUNET_DB_QueryStatus qs;
     619             : 
     620          50 :   qs = db_plugin->commit (db_plugin->cls);
     621          50 :   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
     622          50 :     return qs;
     623           0 :   GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
     624             :               ? GNUNET_ERROR_TYPE_INFO
     625             :               : GNUNET_ERROR_TYPE_ERROR,
     626             :               "Failed to commit database transaction!\n");
     627           0 :   return qs;
     628             : }
     629             : 
     630             : 
     631             : /**
     632             :  * Main work function that queries the DB and aggregates transactions
     633             :  * into larger wire transfers.
     634             :  *
     635             :  * @param cls NULL
     636             :  */
     637             : static void
     638          97 : run_aggregation (void *cls)
     639             : {
     640             :   struct AggregationUnit au_active;
     641             :   enum GNUNET_DB_QueryStatus qs;
     642             : 
     643             :   (void) cls;
     644          97 :   task = NULL;
     645          97 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     646             :               "Checking for ready deposits to aggregate\n");
     647          97 :   if (GNUNET_SYSERR ==
     648          97 :       db_plugin->preflight (db_plugin->cls))
     649             :   {
     650           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     651             :                 "Failed to obtain database connection!\n");
     652           0 :     global_ret = EXIT_FAILURE;
     653           0 :     GNUNET_SCHEDULER_shutdown ();
     654           0 :     return;
     655             :   }
     656          97 :   if (GNUNET_OK !=
     657          97 :       db_plugin->start_deferred_wire_out (db_plugin->cls))
     658             :   {
     659           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     660             :                 "Failed to start database transaction!\n");
     661           0 :     global_ret = EXIT_FAILURE;
     662           0 :     GNUNET_SCHEDULER_shutdown ();
     663           0 :     return;
     664             :   }
     665          97 :   memset (&au_active,
     666             :           0,
     667             :           sizeof (au_active));
     668          97 :   qs = db_plugin->get_ready_deposit (db_plugin->cls,
     669             :                                      &deposit_cb,
     670             :                                      &au_active);
     671          97 :   if (0 >= qs)
     672             :   {
     673          47 :     cleanup_au (&au_active);
     674          47 :     db_plugin->rollback (db_plugin->cls);
     675          47 :     if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     676             :     {
     677           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     678             :                   "Failed to execute deposit iteration!\n");
     679           0 :       global_ret = EXIT_FAILURE;
     680           0 :       GNUNET_SCHEDULER_shutdown ();
     681           0 :       return;
     682             :     }
     683          47 :     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
     684             :     {
     685             :       /* should re-try immediately */
     686           0 :       GNUNET_assert (NULL == task);
     687           0 :       task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     688             :                                        NULL);
     689           0 :       return;
     690             :     }
     691          47 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     692             :                 "No more ready deposits, going to sleep\n");
     693          47 :     if (GNUNET_YES == test_mode)
     694             :     {
     695             :       /* in test mode, shutdown if we end up being idle */
     696          47 :       GNUNET_SCHEDULER_shutdown ();
     697             :     }
     698             :     else
     699             :     {
     700             :       /* nothing to do, sleep for a minute and try again */
     701           0 :       GNUNET_assert (NULL == task);
     702           0 :       task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
     703             :                                            &run_aggregation,
     704             :                                            NULL);
     705             :     }
     706          47 :     return;
     707             :   }
     708             : 
     709             :   /* Now try to find other deposits to aggregate */
     710          50 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     711             :               "Found ready deposit for %s, aggregating\n",
     712             :               TALER_B2S (&au_active.merchant_pub));
     713          50 :   qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
     714             :                                              &au_active.h_wire,
     715             :                                              &au_active.merchant_pub,
     716             :                                              &aggregate_cb,
     717             :                                              &au_active,
     718             :                                              TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT);
     719          50 :   if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) ||
     720          50 :        (GNUNET_YES == au_active.failed) )
     721             :   {
     722           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     723             :                 "Failed to execute deposit iteration!\n");
     724           0 :     cleanup_au (&au_active);
     725           0 :     db_plugin->rollback (db_plugin->cls);
     726           0 :     global_ret = EXIT_FAILURE;
     727           0 :     GNUNET_SCHEDULER_shutdown ();
     728           0 :     return;
     729             :   }
     730          50 :   if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
     731             :   {
     732             :     /* serializiability issue, try again */
     733           0 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     734             :                 "Serialization issue, trying again later!\n");
     735           0 :     db_plugin->rollback (db_plugin->cls);
     736           0 :     cleanup_au (&au_active);
     737           0 :     GNUNET_assert (NULL == task);
     738           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     739             :                                      NULL);
     740           0 :     return;
     741             :   }
     742             : 
     743             :   /* Subtract wire transfer fee and round to the unit supported by the
     744             :      wire transfer method; Check if after rounding down, we still have
     745             :      an amount to transfer, and if not mark as 'tiny'. */
     746          50 :   if ( (0 >=
     747          50 :         TALER_amount_subtract (&au_active.final_amount,
     748             :                                &au_active.total_amount,
     749          45 :                                &au_active.wire_fee)) ||
     750             :        (GNUNET_SYSERR ==
     751          45 :         TALER_amount_round_down (&au_active.final_amount,
     752          45 :                                  &currency_round_unit)) ||
     753          45 :        ( (0 == au_active.final_amount.value) &&
     754          27 :          (0 == au_active.final_amount.fraction) ) )
     755             :   {
     756           5 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     757             :                 "Aggregate value too low for transfer (%d/%s)\n",
     758             :                 qs,
     759             :                 TALER_amount2s (&au_active.final_amount));
     760             :     /* Rollback ongoing transaction, as we will not use the respective
     761             :        WTID and thus need to remove the tracking data */
     762           5 :     db_plugin->rollback (db_plugin->cls);
     763             : 
     764             :     /* There were results, just the value was too low.  Start another
     765             :        transaction to mark all* of the selected deposits as minor! */
     766           5 :     if (GNUNET_OK !=
     767           5 :         db_plugin->start (db_plugin->cls,
     768             :                           "aggregator mark tiny transactions"))
     769             :     {
     770           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     771             :                   "Failed to start database transaction!\n");
     772           0 :       global_ret = EXIT_FAILURE;
     773           0 :       cleanup_au (&au_active);
     774           0 :       GNUNET_SCHEDULER_shutdown ();
     775           0 :       return;
     776             :     }
     777             :     /* Mark transactions by row_id as minor */
     778           5 :     qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
     779             :                                        au_active.row_id);
     780           5 :     if (0 <= qs)
     781             :     {
     782          10 :       for (unsigned int i = 0; i<au_active.rows_offset; i++)
     783             :       {
     784           5 :         qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
     785             :                                            au_active.additional_rows[i]);
     786           5 :         if (0 > qs)
     787           0 :           break;
     788             :       }
     789             :     }
     790           5 :     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
     791             :     {
     792           0 :       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     793             :                   "Serialization issue, trying again later!\n");
     794           0 :       db_plugin->rollback (db_plugin->cls);
     795           0 :       cleanup_au (&au_active);
     796             :       /* start again */
     797           0 :       GNUNET_assert (NULL == task);
     798           0 :       task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     799             :                                        NULL);
     800           0 :       return;
     801             :     }
     802           5 :     if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     803             :     {
     804           0 :       db_plugin->rollback (db_plugin->cls);
     805           0 :       cleanup_au (&au_active);
     806           0 :       global_ret = EXIT_FAILURE;
     807           0 :       GNUNET_SCHEDULER_shutdown ();
     808           0 :       return;
     809             :     }
     810             :     /* commit */
     811           5 :     (void) commit_or_warn ();
     812           5 :     cleanup_au (&au_active);
     813             : 
     814             :     /* start again */
     815           5 :     GNUNET_assert (NULL == task);
     816           5 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     817             :                                      NULL);
     818           5 :     return;
     819             :   }
     820             :   {
     821             :     char *amount_s;
     822             : 
     823          45 :     amount_s = TALER_amount_to_string (&au_active.final_amount);
     824          45 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     825             :                 "Preparing wire transfer of %s to %s\n",
     826             :                 amount_s,
     827             :                 TALER_B2S (&au_active.merchant_pub));
     828          45 :     GNUNET_free (amount_s);
     829             :   }
     830             : 
     831             :   {
     832             :     void *buf;
     833             :     size_t buf_size;
     834             : 
     835             :     {
     836             :       char *url;
     837             : 
     838          45 :       url = TALER_JSON_wire_to_payto (au_active.wire);
     839          45 :       TALER_BANK_prepare_transfer (url,
     840             :                                    &au_active.final_amount,
     841             :                                    exchange_base_url,
     842             :                                    &au_active.wtid,
     843             :                                    &buf,
     844             :                                    &buf_size);
     845          45 :       GNUNET_free (url);
     846             :     }
     847             : 
     848          45 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     849             :                 "Storing %u bytes of wire prepare data\n",
     850             :                 (unsigned int) buf_size);
     851             :     /* Commit our intention to execute the wire transfer! */
     852          45 :     qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
     853          45 :                                               au_active.wa->method,
     854             :                                               buf,
     855             :                                               buf_size);
     856          45 :     GNUNET_free (buf);
     857             :   }
     858             :   /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
     859             :      table constraints */
     860          45 :   if (qs >= 0)
     861          45 :     qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
     862             :                                              au_active.execution_time,
     863             :                                              &au_active.wtid,
     864          45 :                                              au_active.wire,
     865          45 :                                              au_active.wa->section_name,
     866             :                                              &au_active.final_amount);
     867          45 :   cleanup_au (&au_active);
     868             : 
     869          45 :   if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
     870             :   {
     871           0 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     872             :                 "Serialization issue for prepared wire data; trying again later!\n");
     873           0 :     db_plugin->rollback (db_plugin->cls);
     874             :     /* start again */
     875           0 :     GNUNET_assert (NULL == task);
     876           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     877             :                                      NULL);
     878           0 :     return;
     879             :   }
     880          45 :   if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     881             :   {
     882           0 :     GNUNET_break (0);
     883           0 :     db_plugin->rollback (db_plugin->cls);
     884             :     /* die hard */
     885           0 :     global_ret = EXIT_FAILURE;
     886           0 :     GNUNET_SCHEDULER_shutdown ();
     887           0 :     return;
     888             :   }
     889             : 
     890          45 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     891             :               "Stored wire transfer out instructions\n");
     892             : 
     893             :   /* Now we can finally commit the overall transaction, as we are
     894             :      again consistent if all of this passes. */
     895          45 :   switch (commit_or_warn ())
     896             :   {
     897           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     898             :     /* try again */
     899           0 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     900             :                 "Commit issue for prepared wire data; trying again later!\n");
     901           0 :     GNUNET_assert (NULL == task);
     902           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     903             :                                      NULL);
     904           0 :     return;
     905           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     906           0 :     GNUNET_break (0);
     907           0 :     global_ret = EXIT_FAILURE;
     908           0 :     GNUNET_SCHEDULER_shutdown ();
     909           0 :     return;
     910          45 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     911          45 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     912             :                 "Preparation complete, going again\n");
     913          45 :     GNUNET_assert (NULL == task);
     914          45 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     915             :                                      NULL);
     916          45 :     return;
     917           0 :   default:
     918           0 :     GNUNET_break (0);
     919           0 :     global_ret = EXIT_FAILURE;
     920           0 :     GNUNET_SCHEDULER_shutdown ();
     921           0 :     return;
     922             :   }
     923             : }
     924             : 
     925             : 
     926             : /**
     927             :  * First task.
     928             :  *
     929             :  * @param cls closure, NULL
     930             :  * @param args remaining command-line arguments
     931             :  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
     932             :  * @param c configuration
     933             :  */
     934             : static void
     935          47 : run (void *cls,
     936             :      char *const *args,
     937             :      const char *cfgfile,
     938             :      const struct GNUNET_CONFIGURATION_Handle *c)
     939             : {
     940             :   (void) cls;
     941             :   (void) args;
     942             :   (void) cfgfile;
     943             : 
     944          47 :   cfg = c;
     945          47 :   if (GNUNET_OK != parse_wirewatch_config ())
     946             :   {
     947           0 :     cfg = NULL;
     948           0 :     global_ret = EXIT_NOTCONFIGURED;
     949           0 :     return;
     950             :   }
     951          47 :   GNUNET_assert (NULL == task);
     952          47 :   task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     953             :                                    NULL);
     954          47 :   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
     955             :                                  cls);
     956             : }
     957             : 
     958             : 
     959             : /**
     960             :  * The main function of the taler-exchange-aggregator.
     961             :  *
     962             :  * @param argc number of arguments from the command line
     963             :  * @param argv command line arguments
     964             :  * @return 0 ok, non-zero on error, see #global_ret
     965             :  */
     966             : int
     967          47 : main (int argc,
     968             :       char *const *argv)
     969             : {
     970          47 :   struct GNUNET_GETOPT_CommandLineOption options[] = {
     971          47 :     GNUNET_GETOPT_option_timetravel ('T',
     972             :                                      "timetravel"),
     973          47 :     GNUNET_GETOPT_option_flag ('t',
     974             :                                "test",
     975             :                                "run in test mode and exit when idle",
     976             :                                &test_mode),
     977             :     GNUNET_GETOPT_OPTION_END
     978             :   };
     979             :   enum GNUNET_GenericReturnValue ret;
     980             : 
     981          47 :   if (GNUNET_OK !=
     982          47 :       GNUNET_STRINGS_get_utf8_args (argc, argv,
     983             :                                     &argc, &argv))
     984           0 :     return EXIT_INVALIDARGUMENT;
     985          47 :   TALER_OS_init ();
     986          47 :   ret = GNUNET_PROGRAM_run (
     987             :     argc, argv,
     988             :     "taler-exchange-aggregator",
     989             :     gettext_noop (
     990             :       "background process that aggregates and executes wire transfers"),
     991             :     options,
     992             :     &run, NULL);
     993          47 :   GNUNET_free_nz ((void *) argv);
     994          47 :   if (GNUNET_SYSERR == ret)
     995           0 :     return EXIT_INVALIDARGUMENT;
     996          47 :   if (GNUNET_NO == ret)
     997           0 :     return EXIT_SUCCESS;
     998          47 :   return global_ret;
     999             : }
    1000             : 
    1001             : 
    1002             : /* end of taler-exchange-aggregator.c */

Generated by: LCOV version 1.14