LCOV - code coverage report
Current view: top level - exchange - taler-exchange-aggregator.c (source / functions) Hit Total Coverage
Test: rcoverage.info Lines: 337 634 53.2 %
Date: 2017-11-25 11:31:41 Functions: 20 20 100.0 %

          Line data    Source code
       1             : /*
       2             :   This file is part of TALER
       3             :   Copyright (C) 2016, 2017 GNUnet e.V.
       4             : 
       5             :   TALER is free software; you can redistribute it and/or modify it under the
       6             :   terms of the GNU Affero General Public License as published by the Free Software
       7             :   Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
      12             : 
      13             :   You should have received a copy of the GNU Affero General Public License along with
      14             :   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             : */
      16             : 
      17             : /**
      18             :  * @file taler-exchange-aggregator.c
      19             :  * @brief Process that aggregates outgoing transactions and executes them
      20             :  * @author Christian Grothoff
      21             :  */
      22             : #include "platform.h"
      23             : #include <gnunet/gnunet_util_lib.h>
      24             : #include <jansson.h>
      25             : #include <pthread.h>
      26             : #include "taler_exchangedb_lib.h"
      27             : #include "taler_exchangedb_plugin.h"
      28             : #include "taler_json_lib.h"
      29             : #include "taler_wire_lib.h"
      30             : 
      31             : 
      32             : /**
      33             :  * Information we keep for each loaded wire plugin.
      34             :  */
      35             : struct WirePlugin
      36             : {
      37             :   /**
      38             :    * Plugins are kept in a DLL.
      39             :    */
      40             :   struct WirePlugin *next;
      41             : 
      42             :   /**
      43             :    * Plugins are kept in a DLL.
      44             :    */
      45             :   struct WirePlugin *prev;
      46             : 
      47             :   /**
      48             :    * Handle to the plugin.
      49             :    */
      50             :   struct TALER_WIRE_Plugin *wire_plugin;
      51             : 
      52             :   /**
      53             :    * Name of the plugin.
      54             :    */
      55             :   char *type;
      56             : 
      57             :   /**
      58             :    * Wire transfer fee structure.
      59             :    */
      60             :   struct TALER_EXCHANGEDB_AggregateFees *af;
      61             : 
      62             : };
      63             : 
      64             : 
      65             : /**
      66             :  * Data we keep to #run_transfers().  There is at most
      67             :  * one of these around at any given point in time.
      68             :  */
      69             : struct WirePrepareData
      70             : {
      71             : 
      72             :   /**
      73             :    * Database session for all of our transactions.
      74             :    */
      75             :   struct TALER_EXCHANGEDB_Session *session;
      76             : 
      77             :   /**
      78             :    * Wire execution handle.
      79             :    */
      80             :   struct TALER_WIRE_ExecuteHandle *eh;
      81             : 
      82             :   /**
      83             :    * Wire plugin used for this preparation.
      84             :    */
      85             :   struct WirePlugin *wp;
      86             : 
      87             :   /**
      88             :    * Row ID of the transfer.
      89             :    */
      90             :   unsigned long long row_id;
      91             : 
      92             : };
      93             : 
      94             : 
      95             : /**
      96             :  * Information about one aggregation process to be executed.  There is
      97             :  * at most one of these around at any given point in time.
      98             :  */
      99             : struct AggregationUnit
     100             : {
     101             :   /**
     102             :    * Public key of the merchant.
     103             :    */
     104             :   struct TALER_MerchantPublicKeyP merchant_pub;
     105             : 
     106             :   /**
     107             :    * Total amount to be transferred, before subtraction of @e wire_fee and rounding down.
     108             :    */
     109             :   struct TALER_Amount total_amount;
     110             : 
     111             :   /**
     112             :    * Final amount to be transferred (after fee and rounding down).
     113             :    */
     114             :   struct TALER_Amount final_amount;
     115             : 
     116             :   /**
     117             :    * Wire fee we charge for @e wp at @e execution_time.
     118             :    */
     119             :   struct TALER_Amount wire_fee;
     120             : 
     121             :   /**
     122             :    * Hash of @e wire.
     123             :    */
     124             :   struct GNUNET_HashCode h_wire;
     125             : 
     126             :   /**
     127             :    * Wire transfer identifier we use.
     128             :    */
     129             :   struct TALER_WireTransferIdentifierRawP wtid;
     130             : 
     131             :   /**
     132             :    * Row ID of the transaction that started it all.
     133             :    */
     134             :   unsigned long long row_id;
     135             : 
     136             :   /**
     137             :    * The current time.
     138             :    */
     139             :   struct GNUNET_TIME_Absolute execution_time;
     140             : 
     141             :   /**
     142             :    * Wire details of the merchant.
     143             :    */
     144             :   json_t *wire;
     145             : 
     146             :   /**
     147             :    * Wire plugin to be used for the preparation.
     148             :    */
     149             :   struct WirePlugin *wp;
     150             : 
     151             :   /**
     152             :    * Database session for all of our transactions.
     153             :    */
     154             :   struct TALER_EXCHANGEDB_Session *session;
     155             : 
     156             :   /**
     157             :    * Wire preparation handle.
     158             :    */
     159             :   struct TALER_WIRE_PrepareHandle *ph;
     160             : 
     161             :   /**
     162             :    * Array of #aggregation_limit row_ids from the
     163             :    * aggregation.
     164             :    */
     165             :   unsigned long long *additional_rows;
     166             : 
     167             :   /**
     168             :    * Offset specifying how many #additional_rows are in use.
     169             :    */
     170             :   unsigned int rows_offset;
     171             : 
     172             :   /**
     173             :    * Set to #GNUNET_YES if we have to abort due to failure.
     174             :    */
     175             :   int failed;
     176             : 
     177             : };
     178             : 
     179             : 
     180             : /**
     181             :  * Context we use while closing a reserve.
     182             :  */
     183             : struct CloseTransferContext
     184             : {
     185             :   /**
     186             :    * Handle for preparing the wire transfer.
     187             :    */
     188             :   struct TALER_WIRE_PrepareHandle *ph;
     189             : 
     190             :   /**
     191             :    * Our database session.
     192             :    */
     193             :   struct TALER_EXCHANGEDB_Session *session;
     194             : 
     195             :   /**
     196             :    * Wire transfer method.
     197             :    */
     198             :   char *type;
     199             : 
     200             :   /**
     201             :    * Wire plugin used for closing the reserve.
     202             :    */
     203             :   struct WirePlugin *wp;
     204             : };
     205             : 
     206             : 
     207             : /**
     208             :  * Active context while processing reserve closing,
     209             :  * or NULL.
     210             :  */
     211             : static struct CloseTransferContext *ctc;
     212             : 
     213             : /**
     214             :  * Which currency is used by this exchange?
     215             :  */
     216             : static char *exchange_currency_string;
     217             : 
     218             : /**
     219             :  * What is the base URL of this exchange?
     220             :  */
     221             : static char *exchange_base_url;
     222             : 
     223             : /**
     224             :  * The exchange's configuration (global)
     225             :  */
     226             : static struct GNUNET_CONFIGURATION_Handle *cfg;
     227             : 
     228             : /**
     229             :  * Our DB plugin.
     230             :  */
     231             : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
     232             : 
     233             : /**
     234             :  * Head of list of loaded wire plugins.
     235             :  */
     236             : static struct WirePlugin *wp_head;
     237             : 
     238             : /**
     239             :  * Tail of list of loaded wire plugins.
     240             :  */
     241             : static struct WirePlugin *wp_tail;
     242             : 
     243             : /**
     244             :  * Next task to run, if any.
     245             :  */
     246             : static struct GNUNET_SCHEDULER_Task *task;
     247             : 
     248             : /**
     249             :  * If we are currently executing a transfer, information about
     250             :  * the active transfer is here. Otherwise, this variable is NULL.
     251             :  */
     252             : static struct WirePrepareData *wpd;
     253             : 
     254             : /**
     255             :  * If we are currently aggregating transactions, information about the
     256             :  * active aggregation is here. Otherwise, this variable is NULL.
     257             :  */
     258             : static struct AggregationUnit *au;
     259             : 
     260             : /**
     261             :  * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
     262             :  * on serious errors.
     263             :  */
     264             : static int global_ret;
     265             : 
     266             : /**
     267             :  * #GNUNET_YES if we are in test mode and should exit when idle.
     268             :  */
     269             : static int test_mode;
     270             : 
     271             : /**
     272             :  * Did #run_reserve_closures() have any work during its last run?
     273             :  */
     274             : static int reserves_idle;
     275             : 
     276             : /**
     277             :  * Limit on the number of transactions we aggregate at once.  Note
     278             :  * that the limit must be big enough to ensure that when transactions
     279             :  * of the smallest possible unit are aggregated, they do surpass the
     280             :  * "tiny" threshold beyond which we never trigger a wire transaction!
     281             :  *
     282             :  * Note: do not change here, Postgres requires us to hard-code the
     283             :  * LIMIT in the prepared statement.
     284             :  */
     285             : static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT;
     286             : 
     287             : 
     288             : /**
     289             :  * Extract wire plugin type from @a wire address
     290             :  *
     291             :  * @param wire a wire address
     292             :  * @return NULL if @a wire is ill-formed
     293             :  */
     294             : const char *
     295          22 : extract_type (const json_t *wire)
     296             : {
     297             :   const char *type;
     298             :   json_t *t;
     299             : 
     300          22 :   t = json_object_get (wire, "type");
     301          22 :   if (NULL == t)
     302             :   {
     303           0 :     GNUNET_break (0);
     304           0 :     return NULL;
     305             :   }
     306          22 :   type = json_string_value (t);
     307          22 :   if (NULL == type)
     308             :   {
     309           0 :     GNUNET_break (0);
     310           0 :     return NULL;
     311             :   }
     312          22 :   return type;
     313             : }
     314             : 
     315             : 
     316             : /**
     317             :  * Advance the "af" pointer in @a wp to point to the
     318             :  * currently valid record.
     319             :  *
     320             :  * @param wp wire transfer fee data structure to update
     321             :  * @param now timestamp to update fees to
     322             :  */
     323             : static void
     324          39 : advance_fees (struct WirePlugin *wp,
     325             :               struct GNUNET_TIME_Absolute now)
     326             : {
     327             :   struct TALER_EXCHANGEDB_AggregateFees *af;
     328             : 
     329             :   /* First, try to see if we have current fee information in memory */
     330          39 :   af = wp->af;
     331         100 :   while ( (NULL != af) &&
     332          22 :           (af->end_date.abs_value_us < now.abs_value_us) )
     333             :   {
     334           0 :     struct TALER_EXCHANGEDB_AggregateFees *n = af->next;
     335             : 
     336           0 :     GNUNET_free (af);
     337           0 :     af = n;
     338             :   }
     339          39 :   wp->af = af;
     340          39 : }
     341             : 
     342             : 
     343             : /**
     344             :  * Update wire transfer fee data structure in @a wp.
     345             :  *
     346             :  * @param wp wire transfer fee data structure to update
     347             :  * @param now timestamp to update fees to
     348             :  * @param session DB session to use
     349             :  * @return transaction status
     350             :  */
     351             : static enum GNUNET_DB_QueryStatus
     352          22 : update_fees (struct WirePlugin *wp,
     353             :              struct GNUNET_TIME_Absolute now,
     354             :              struct TALER_EXCHANGEDB_Session *session)
     355             : {
     356             :   enum GNUNET_DB_QueryStatus qs;
     357             : 
     358          22 :   advance_fees (wp,
     359             :                 now);
     360          22 :   if (NULL != wp->af)
     361           5 :     return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     362             :   /* Let's try to load it from disk... */
     363          17 :   wp->af = TALER_EXCHANGEDB_fees_read (cfg,
     364          17 :                                        wp->type);
     365          17 :   advance_fees (wp,
     366             :                 now);
     367          70 :   for (struct TALER_EXCHANGEDB_AggregateFees *p = wp->af;
     368             :        NULL != p;
     369          36 :        p = p->next)
     370             :   {
     371          72 :     qs = db_plugin->insert_wire_fee (db_plugin->cls,
     372             :                                      session,
     373          36 :                                      wp->type,
     374             :                                      p->start_date,
     375             :                                      p->end_date,
     376          36 :                                      &p->wire_fee,
     377          36 :                                      &p->master_sig);
     378          36 :     if (qs < 0)
     379             :     {
     380           0 :       TALER_EXCHANGEDB_fees_free (wp->af);
     381           0 :       wp->af = NULL;
     382           0 :       return qs;
     383             :     }
     384             :   }
     385          17 :   if (NULL != wp->af)
     386          17 :     return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     387           0 :   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     388             :               "Failed to find current wire transfer fees for `%s'\n",
     389             :               wp->type);
     390           0 :   return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
     391             : }
     392             : 
     393             : 
     394             : /**
     395             :  * Find the wire plugin for the given wire address.
     396             :  *
     397             :  * @param type wire plugin type we need a plugin for
     398             :  * @return NULL on error
     399             :  */
     400             : static struct WirePlugin *
     401          40 : find_plugin (const char *type)
     402             : {
     403             :   struct WirePlugin *wp;
     404             : 
     405          40 :   if (NULL == type)
     406           0 :     return NULL;
     407          40 :   for (wp = wp_head; NULL != wp; wp = wp->next)
     408          23 :     if (0 == strcmp (type,
     409          23 :                      wp->type))
     410          23 :       return wp;
     411          17 :   wp = GNUNET_new (struct WirePlugin);
     412          17 :   wp->wire_plugin = TALER_WIRE_plugin_load (cfg,
     413             :                                             type);
     414          17 :   if (NULL == wp->wire_plugin)
     415             :   {
     416           0 :     fprintf (stderr,
     417             :              "Failed to load wire plugin for `%s'\n",
     418             :              type);
     419           0 :     GNUNET_free (wp);
     420           0 :     return NULL;
     421             :   }
     422          17 :   wp->type = GNUNET_strdup (type);
     423          17 :   GNUNET_CONTAINER_DLL_insert (wp_head,
     424             :                                wp_tail,
     425             :                                wp);
     426          17 :   return wp;
     427             : }
     428             : 
     429             : 
     430             : /**
     431             :  * Free data stored in #au.
     432             :  */
     433             : static void
     434          61 : cleanup_au (void)
     435             : {
     436          61 :   if (NULL == au)
     437           0 :     return;
     438          61 :   GNUNET_free_non_null (au->additional_rows);
     439          61 :   if (NULL != au->wire)
     440             :   {
     441          21 :     json_decref (au->wire);
     442          21 :     au->wire = NULL;
     443             :   }
     444          61 :   GNUNET_free (au);
     445          61 :   au = NULL;
     446             : }
     447             : 
     448             : 
     449             : /**
     450             :  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
     451             :  *
     452             :  * @param cls closure
     453             :  */
     454             : static void
     455          28 : shutdown_task (void *cls)
     456             : {
     457          28 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     458             :               "Running shutdown\n");
     459          28 :   if (NULL != task)
     460             :   {
     461           0 :     GNUNET_SCHEDULER_cancel (task);
     462           0 :     task = NULL;
     463             :   }
     464          28 :   if (NULL != wpd)
     465             :   {
     466           0 :     if (NULL != wpd->eh)
     467             :     {
     468           0 :       wpd->wp->wire_plugin->execute_wire_transfer_cancel (wpd->wp->wire_plugin->cls,
     469           0 :                                                           wpd->eh);
     470           0 :       wpd->eh = NULL;
     471             :     }
     472           0 :     db_plugin->rollback (db_plugin->cls,
     473           0 :                          wpd->session);
     474           0 :     GNUNET_free (wpd);
     475           0 :     wpd = NULL;
     476             :   }
     477          28 :   if (NULL != au)
     478             :   {
     479           0 :     if (NULL != au->ph)
     480             :     {
     481           0 :       au->wp->wire_plugin->prepare_wire_transfer_cancel (au->wp->wire_plugin->cls,
     482           0 :                                                          au->ph);
     483           0 :       au->ph = NULL;
     484             :     }
     485           0 :     db_plugin->rollback (db_plugin->cls,
     486           0 :                          au->session);
     487           0 :     cleanup_au ();
     488             :   }
     489          28 :   if (NULL != ctc)
     490             :   {
     491           0 :     ctc->wp->wire_plugin->prepare_wire_transfer_cancel (ctc->wp->wire_plugin->cls,
     492           0 :                                                         ctc->ph);
     493           0 :     ctc->ph = NULL;
     494           0 :     db_plugin->rollback (db_plugin->cls,
     495           0 :                          ctc->session);
     496           0 :     GNUNET_free (ctc->type);
     497           0 :     GNUNET_free (ctc);
     498           0 :     ctc = NULL;
     499             :   }
     500          28 :   TALER_EXCHANGEDB_plugin_unload (db_plugin);
     501             : 
     502             :   {
     503             :     struct WirePlugin *wp;
     504             : 
     505          73 :     while (NULL != (wp = wp_head))
     506             :     {
     507          17 :       GNUNET_CONTAINER_DLL_remove (wp_head,
     508             :                                    wp_tail,
     509             :                                    wp);
     510          17 :       TALER_WIRE_plugin_unload (wp->wire_plugin);
     511          17 :       TALER_EXCHANGEDB_fees_free (wp->af);
     512          17 :       GNUNET_free (wp->type);
     513          17 :       GNUNET_free (wp);
     514             :     }
     515             :   }
     516          28 :   GNUNET_CONFIGURATION_destroy (cfg);
     517          28 :   cfg = NULL;
     518          28 : }
     519             : 
     520             : 
     521             : /**
     522             :  * Parse configuration parameters for the exchange server into the
     523             :  * corresponding global variables.
     524             :  *
     525             :  * @return #GNUNET_OK on success
     526             :  */
     527             : static int
     528          28 : exchange_serve_process_config ()
     529             : {
     530          28 :   if (GNUNET_OK !=
     531          28 :       GNUNET_CONFIGURATION_get_value_string (cfg,
     532             :                                              "taler",
     533             :                                              "currency",
     534             :                                              &exchange_currency_string))
     535             :   {
     536           0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     537             :                                "taler",
     538             :                                "currency");
     539           0 :     return GNUNET_SYSERR;
     540             :   }
     541          28 :   if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN)
     542             :   {
     543           0 :     fprintf (stderr,
     544             :              "Currency `%s' longer than the allowed limit of %u characters.",
     545             :              exchange_currency_string,
     546             :              (unsigned int) TALER_CURRENCY_LEN);
     547           0 :     return GNUNET_SYSERR;
     548             :   }
     549             : 
     550          28 :   if (NULL ==
     551          28 :       (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
     552             :   {
     553           0 :     fprintf (stderr,
     554             :              "Failed to initialize DB subsystem\n");
     555           0 :     return GNUNET_SYSERR;
     556             :   }
     557          28 :   if (GNUNET_OK !=
     558          28 :       db_plugin->create_tables (db_plugin->cls))
     559             :   {
     560           0 :     fprintf (stderr,
     561             :              "Failed to initialize DB tables\n");
     562           0 :     TALER_EXCHANGEDB_plugin_unload (db_plugin);
     563           0 :     return GNUNET_SYSERR;
     564             :   }
     565             : 
     566          28 :   return GNUNET_OK;
     567             : }
     568             : 
     569             : 
     570             : /**
     571             :  * Function called with details about deposits that have been made,
     572             :  * with the goal of executing the corresponding wire transaction.
     573             :  *
     574             :  * @param cls NULL
     575             :  * @param row_id identifies database entry
     576             :  * @param merchant_pub public key of the merchant
     577             :  * @param coin_pub public key of the coin
     578             :  * @param amount_with_fee amount that was deposited including fee
     579             :  * @param deposit_fee amount the exchange gets to keep as transaction fees
     580             :  * @param h_contract_terms hash of the proposal data known to merchant and customer
     581             :  * @param wire_deadline by which the merchant adviced that he would like the
     582             :  *        wire transfer to be executed
     583             :  * @param wire wire details for the merchant
     584             :  * @return transaction status code,  #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
     585             :  */
     586             : static enum GNUNET_DB_QueryStatus
     587          21 : deposit_cb (void *cls,
     588             :             uint64_t row_id,
     589             :             const struct TALER_MerchantPublicKeyP *merchant_pub,
     590             :             const struct TALER_CoinSpendPublicKeyP *coin_pub,
     591             :             const struct TALER_Amount *amount_with_fee,
     592             :             const struct TALER_Amount *deposit_fee,
     593             :             const struct GNUNET_HashCode *h_contract_terms,
     594             :             struct GNUNET_TIME_Absolute wire_deadline,
     595             :             const json_t *wire)
     596             : {
     597             :   enum GNUNET_DB_QueryStatus qs;
     598             : 
     599          21 :   au->merchant_pub = *merchant_pub;
     600          21 :   if (GNUNET_SYSERR ==
     601          21 :       TALER_amount_subtract (&au->total_amount,
     602             :                              amount_with_fee,
     603             :                              deposit_fee))
     604             :   {
     605           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     606             :                 "Fatally malformed record at row %llu over %s\n",
     607             :                 (unsigned long long) row_id,
     608             :                 TALER_amount2s (amount_with_fee));
     609           0 :     return GNUNET_DB_STATUS_HARD_ERROR;
     610             :   }
     611          21 :   au->row_id = row_id;
     612          21 :   GNUNET_assert (NULL == au->wire);
     613          21 :   au->wire = json_incref ((json_t *) wire);
     614          21 :   if (GNUNET_OK !=
     615          21 :       TALER_JSON_hash (au->wire,
     616          21 :                        &au->h_wire))
     617             :   {
     618           0 :     GNUNET_break (0);
     619           0 :     json_decref (au->wire);
     620           0 :     au->wire = NULL;
     621           0 :     return GNUNET_DB_STATUS_HARD_ERROR;
     622             :   }
     623          21 :   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
     624          21 :                               &au->wtid,
     625             :                               sizeof (au->wtid));
     626          21 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     627             :               "Starting aggregation under H(WTID)=%s\n",
     628             :               TALER_B2S (&au->wtid));
     629             : 
     630          21 :   au->wp = find_plugin (extract_type (au->wire));
     631          21 :   if (NULL == au->wp)
     632           0 :     return GNUNET_DB_STATUS_HARD_ERROR;
     633             : 
     634             :   /* make sure we have current fees */
     635          21 :   au->execution_time = GNUNET_TIME_absolute_get ();
     636          21 :   (void) GNUNET_TIME_round_abs (&au->execution_time);
     637          42 :   qs = update_fees (au->wp,
     638          21 :                     au->execution_time,
     639          21 :                     au->session);
     640          21 :   if (qs <= 0)
     641             :   {
     642           0 :     if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
     643           0 :       qs = GNUNET_DB_STATUS_HARD_ERROR;
     644           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     645           0 :     return qs;
     646             :   }
     647          21 :   au->wire_fee = au->wp->af->wire_fee;
     648             : 
     649          42 :   qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
     650          21 :                                                au->session,
     651          21 :                                                &au->wtid,
     652             :                                                row_id);
     653          21 :   if (qs <= 0)
     654             :   {
     655           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     656           0 :     return qs;
     657             :   }
     658          42 :   qs = db_plugin->mark_deposit_done (db_plugin->cls,
     659          21 :                                      au->session,
     660             :                                      row_id);
     661          21 :   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
     662             :   {
     663           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     664           0 :     return qs;
     665             :   }
     666          21 :   return qs;
     667             : }
     668             : 
     669             : 
     670             : /**
     671             :  * Function called with details about another deposit we
     672             :  * can aggregate into an existing aggregation unit.
     673             :  *
     674             :  * @param cls NULL
     675             :  * @param row_id identifies database entry
     676             :  * @param merchant_pub public key of the merchant
     677             :  * @param coin_pub public key of the coin
     678             :  * @param amount_with_fee amount that was deposited including fee
     679             :  * @param deposit_fee amount the exchange gets to keep as transaction fees
     680             :  * @param h_contract_terms hash of the proposal data known to merchant and customer
     681             :  * @param wire_deadline by which the merchant adviced that he would like the
     682             :  *        wire transfer to be executed
     683             :  * @param wire wire details for the merchant
     684             :  * @return transaction status code
     685             :  */
     686             : static enum GNUNET_DB_QueryStatus
     687          17 : aggregate_cb (void *cls,
     688             :               uint64_t row_id,
     689             :               const struct TALER_MerchantPublicKeyP *merchant_pub,
     690             :               const struct TALER_CoinSpendPublicKeyP *coin_pub,
     691             :               const struct TALER_Amount *amount_with_fee,
     692             :               const struct TALER_Amount *deposit_fee,
     693             :               const struct GNUNET_HashCode *h_contract_terms,
     694             :               struct GNUNET_TIME_Absolute wire_deadline,
     695             :               const json_t *wire)
     696             : {
     697             :   struct TALER_Amount delta;
     698             :   enum GNUNET_DB_QueryStatus qs;
     699             : 
     700          17 :   GNUNET_break (0 ==
     701             :                 memcmp (&au->merchant_pub,
     702             :                         merchant_pub,
     703             :                         sizeof (struct TALER_MerchantPublicKeyP)));
     704             :   /* compute contribution of this coin after fees */
     705          17 :   if (GNUNET_SYSERR ==
     706          17 :       TALER_amount_subtract (&delta,
     707             :                              amount_with_fee,
     708             :                              deposit_fee))
     709             :   {
     710           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     711             :                 "Fatally malformed record at %llu over amount %s\n",
     712             :                 (unsigned long long) row_id,
     713             :                 TALER_amount2s (amount_with_fee));
     714           0 :     return GNUNET_DB_STATUS_HARD_ERROR;
     715             :   }
     716             :   /* add to total */
     717          17 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     718             :               "Adding transaction amount %s to aggregation\n",
     719             :               TALER_amount2s (&delta));
     720          17 :   if (GNUNET_OK !=
     721          17 :       TALER_amount_add (&au->total_amount,
     722          17 :                         &au->total_amount,
     723             :                         &delta))
     724             :   {
     725           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     726             :                 "Overflow or currency incompatibility during aggregation at %llu\n",
     727             :                 (unsigned long long) row_id);
     728             :     /* Skip this one, but keep going! */
     729           0 :     return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     730             :   }
     731          17 :   if (au->rows_offset >= aggregation_limit)
     732             :   {
     733             :     /* Bug: we asked for at most #aggregation_limit results! */
     734           0 :     GNUNET_break (0);
     735             :     /* Skip this one, but keep going. */
     736           0 :     return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     737             :   }
     738          17 :   if (NULL == au->additional_rows)
     739           9 :     au->additional_rows = GNUNET_new_array (aggregation_limit,
     740             :                                             unsigned long long);
     741             :   /* "append" to our list of rows */
     742          17 :   au->additional_rows[au->rows_offset++] = row_id;
     743             :   /* insert into aggregation tracking table */
     744          34 :   qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
     745          17 :                                                au->session,
     746          17 :                                                &au->wtid,
     747             :                                                row_id);
     748          17 :   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
     749             :   {
     750           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     751           0 :     return qs;
     752             :   }
     753          34 :   qs = db_plugin->mark_deposit_done (db_plugin->cls,
     754          17 :                                      au->session,
     755             :                                      row_id);
     756          17 :   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
     757             :   {
     758           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     759           0 :     return qs;
     760             :   }
     761          17 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     762             :               "Added row %llu to aggregation\n",
     763             :               (unsigned long long) row_id);
     764          17 :   return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     765             : }
     766             : 
     767             : 
     768             : /**
     769             :  * Function to be called with the prepared transfer data
     770             :  * when running an aggregation on a merchant.
     771             :  *
     772             :  * @param cls closure with the `struct AggregationUnit`
     773             :  * @param buf transaction data to persist, NULL on error
     774             :  * @param buf_size number of bytes in @a buf, 0 on error
     775             :  */
     776             : static void
     777             : prepare_cb (void *cls,
     778             :             const char *buf,
     779             :             size_t buf_size);
     780             : 
     781             : 
     782             : /**
     783             :  * Main work function that finds and triggers transfers for reserves
     784             :  * closures.
     785             :  *
     786             :  * @param cls closure
     787             :  */
     788             : static void
     789             : run_reserve_closures (void *cls);
     790             : 
     791             : 
     792             : /**
     793             :  * Main work function that queries the DB and aggregates transactions
     794             :  * into larger wire transfers.
     795             :  *
     796             :  * @param cls NULL
     797             :  */
     798             : static void
     799             : run_aggregation (void *cls);
     800             : 
     801             : 
     802             : /**
     803             :  * Execute the wire transfers that we have committed to
     804             :  * do.
     805             :  *
     806             :  * @param cls pointer to an `int` which we will return from main()
     807             :  */
     808             : static void
     809             : run_transfers (void *cls);
     810             : 
     811             : 
     812             : /**
     813             :  * Perform a database commit. If it fails, print a warning.
     814             :  *
     815             :  * @param session session to perform the commit for.
     816             :  * @return status of commit
     817             :  */
     818             : static enum GNUNET_DB_QueryStatus
     819          40 : commit_or_warn (struct TALER_EXCHANGEDB_Session *session)
     820             : {
     821             :   enum GNUNET_DB_QueryStatus qs;
     822             : 
     823          40 :   qs = db_plugin->commit (db_plugin->cls,
     824             :                           session);
     825          40 :   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
     826          40 :     return qs;
     827           0 :   GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
     828             :               ? GNUNET_ERROR_TYPE_INFO
     829             :               : GNUNET_ERROR_TYPE_ERROR,
     830             :               "Failed to commit database transaction!\n");
     831           0 :   return qs;
     832             : }
     833             : 
     834             : 
     835             : /**
     836             :  * Function to be called with the prepared transfer data
     837             :  * when closing a reserve.
     838             :  *
     839             :  * @param cls closure with a `struct CloseTransferContext`
     840             :  * @param buf transaction data to persist, NULL on error
     841             :  * @param buf_size number of bytes in @a buf, 0 on error
     842             :  */
     843             : static void
     844           1 : prepare_close_cb (void *cls,
     845             :                   const char *buf,
     846             :                   size_t buf_size)
     847             : {
     848             :   enum GNUNET_DB_QueryStatus qs;
     849             : 
     850           1 :   GNUNET_assert (cls == ctc);
     851             : 
     852           1 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     853             :               "Prepared for reserve closing\n");
     854           1 :   ctc->ph = NULL;
     855           1 :   if (NULL == buf)
     856             :   {
     857           0 :     GNUNET_break (0); /* why? how to best recover? */
     858           0 :     db_plugin->rollback (db_plugin->cls,
     859           0 :                          ctc->session);
     860             :     /* start again */
     861           0 :     GNUNET_free (ctc->type);
     862           0 :     GNUNET_free (ctc);
     863           0 :     ctc = NULL;
     864           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     865             :                                      NULL);
     866           0 :     return;
     867             :   }
     868             : 
     869             :   /* Commit our intention to execute the wire transfer! */
     870           2 :   qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
     871           1 :                                             ctc->session,
     872           1 :                                             ctc->type,
     873             :                                             buf,
     874             :                                             buf_size);
     875           1 :   if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     876             :   {
     877           0 :     GNUNET_break (0);
     878           0 :     db_plugin->rollback (db_plugin->cls,
     879           0 :                          ctc->session);
     880           0 :     global_ret = GNUNET_SYSERR;
     881           0 :     GNUNET_SCHEDULER_shutdown ();
     882           0 :     GNUNET_free (ctc->type);
     883           0 :     GNUNET_free (ctc);
     884           0 :     ctc = NULL;
     885           0 :     return;
     886             :   }
     887           1 :   if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
     888             :   {
     889           0 :     db_plugin->rollback (db_plugin->cls,
     890           0 :                          ctc->session);
     891             :     /* start again */
     892           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
     893             :                                      NULL);
     894           0 :     GNUNET_free (ctc->type);
     895           0 :     GNUNET_free (ctc);
     896           0 :     ctc = NULL;
     897           0 :     return;
     898             :   }
     899             : 
     900             :   /* finally commit */
     901           1 :   (void) commit_or_warn (ctc->session);
     902           1 :   GNUNET_free (ctc->type);
     903           1 :   GNUNET_free (ctc);
     904           1 :   ctc = NULL;
     905           1 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     906             :               "Reserve closure committed, running transfer\n");
     907           1 :   task = GNUNET_SCHEDULER_add_now (&run_transfers,
     908             :                                    NULL);
     909             : }
     910             : 
     911             : 
     912             : /**
     913             :  * Closure for #expired_reserve_cb().
     914             :  */
     915             : struct ExpiredReserveContext
     916             : {
     917             : 
     918             :   /**
     919             :    * Database session we are using.
     920             :    */
     921             :   struct TALER_EXCHANGEDB_Session *session;
     922             : 
     923             :   /**
     924             :    * Set to #GNUNET_YES if the transaction continues
     925             :    * asynchronously.
     926             :    */
     927             :   int async_cont;
     928             : };
     929             : 
     930             : 
     931             : /**
     932             :  * Function called with details about expired reserves.
     933             :  * We trigger the reserve closure by inserting the respective
     934             :  * closing record and prewire instructions into the respective
     935             :  * tables.
     936             :  *
     937             :  * @param cls a `struct ExpiredReserveContext *`
     938             :  * @param reserve_pub public key of the reserve
     939             :  * @param left amount left in the reserve
     940             :  * @param account_details information about the reserve's bank account
     941             :  * @param expiration_date when did the reserve expire
     942             :  * @return transaction status code
     943             :  */
     944             : static enum GNUNET_DB_QueryStatus
     945           1 : expired_reserve_cb (void *cls,
     946             :                     const struct TALER_ReservePublicKeyP *reserve_pub,
     947             :                     const struct TALER_Amount *left,
     948             :                     const json_t *account_details,
     949             :                     struct GNUNET_TIME_Absolute expiration_date)
     950             : {
     951           1 :   struct ExpiredReserveContext *erc = cls;
     952           1 :   struct TALER_EXCHANGEDB_Session *session = erc->session;
     953             :   struct GNUNET_TIME_Absolute now;
     954             :   struct TALER_WireTransferIdentifierRawP wtid;
     955             :   struct TALER_Amount amount_without_fee;
     956             :   const struct TALER_Amount *closing_fee;
     957             :   int ret;
     958             :   enum GNUNET_DB_QueryStatus qs;
     959             :   const char *type;
     960             :   struct WirePlugin *wp;
     961             : 
     962           1 :   GNUNET_assert (NULL == ctc);
     963           1 :   now = GNUNET_TIME_absolute_get ();
     964             : 
     965             :   /* lookup wire plugin */
     966           1 :   type = extract_type (account_details);
     967           1 :   if (NULL == type)
     968             :   {
     969           0 :     GNUNET_break (0);
     970           0 :     global_ret = GNUNET_SYSERR;
     971           0 :     GNUNET_SCHEDULER_shutdown ();
     972           0 :     return GNUNET_DB_STATUS_HARD_ERROR;
     973             :   }
     974           1 :   wp = find_plugin (type);
     975           1 :   if (NULL == wp)
     976             :   {
     977           0 :     GNUNET_break (0);
     978           0 :     global_ret = GNUNET_SYSERR;
     979           0 :     GNUNET_SCHEDULER_shutdown ();
     980           0 :     return GNUNET_DB_STATUS_HARD_ERROR;
     981             :   }
     982             : 
     983             :   /* lookup `closing_fee` */
     984           1 :   qs = update_fees (wp,
     985             :                     now,
     986             :                     session);
     987           1 :   if (qs <= 0)
     988             :   {
     989           0 :     if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
     990           0 :       qs = GNUNET_DB_STATUS_HARD_ERROR;
     991           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     992           0 :     global_ret = GNUNET_SYSERR;
     993           0 :     if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     994           0 :       GNUNET_SCHEDULER_shutdown ();
     995           0 :     return qs;
     996             :   }
     997           1 :   closing_fee = &wp->af->closing_fee;
     998             : 
     999             :   /* calculate transfer amount */
    1000           1 :   ret = TALER_amount_subtract (&amount_without_fee,
    1001             :                                left,
    1002             :                                closing_fee);
    1003           1 :   if ( (GNUNET_SYSERR == ret) ||
    1004             :        (GNUNET_NO == ret) )
    1005             :   {
    1006             :     /* Closing fee higher than remaining balance, close
    1007             :        without wire transfer. */
    1008           0 :     closing_fee = left;
    1009           0 :     GNUNET_assert (GNUNET_OK ==
    1010             :                    TALER_amount_get_zero (left->currency,
    1011             :                                           &amount_without_fee));
    1012             :   }
    1013             : 
    1014             :   /* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to
    1015             :      be future-compatible, we use the memset + min construction */
    1016           1 :   memset (&wtid,
    1017             :           0,
    1018             :           sizeof (wtid));
    1019           1 :   memcpy (&wtid,
    1020             :           reserve_pub,
    1021             :           GNUNET_MIN (sizeof (wtid),
    1022             :                       sizeof (*reserve_pub)));
    1023           1 :   qs = db_plugin->insert_reserve_closed (db_plugin->cls,
    1024             :                                          session,
    1025             :                                          reserve_pub,
    1026             :                                          now,
    1027             :                                          account_details,
    1028             :                                          &wtid,
    1029             :                                          left,
    1030             :                                          closing_fee);
    1031           1 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1032             :               "Closing reserve %s over %s (%d, %d)\n",
    1033             :               TALER_B2S (reserve_pub),
    1034             :               TALER_amount2s (left),
    1035             :               ret,
    1036             :               qs);
    1037           1 :   if ( (GNUNET_OK == ret) &&
    1038             :        (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) )
    1039             :   {
    1040             :     /* success, perform wire transfer */
    1041           1 :     if (GNUNET_SYSERR ==
    1042           1 :         wp->wire_plugin->amount_round (wp->wire_plugin->cls,
    1043             :                                        &amount_without_fee))
    1044             :     {
    1045           0 :       GNUNET_break (0);
    1046           0 :       global_ret = GNUNET_SYSERR;
    1047           0 :       GNUNET_SCHEDULER_shutdown ();
    1048           0 :       return GNUNET_DB_STATUS_HARD_ERROR;
    1049             :     }
    1050           1 :     ctc = GNUNET_new (struct CloseTransferContext);
    1051           1 :     ctc->wp = wp;
    1052           1 :     ctc->session = session;
    1053           1 :     ctc->type = GNUNET_strdup (type);
    1054           1 :     ctc->ph
    1055           1 :       = wp->wire_plugin->prepare_wire_transfer (wp->wire_plugin->cls,
    1056             :                                                 account_details,
    1057             :                                                 &amount_without_fee,
    1058             :                                                 exchange_base_url,
    1059             :                                                 &wtid,
    1060             :                                                 &prepare_close_cb,
    1061             :                                                 ctc);
    1062           1 :     if (NULL == ctc->ph)
    1063             :     {
    1064           0 :       GNUNET_break (0);
    1065           0 :       global_ret = GNUNET_SYSERR;
    1066           0 :       GNUNET_SCHEDULER_shutdown ();
    1067           0 :       GNUNET_free (ctc->type);
    1068           0 :       GNUNET_free (ctc);
    1069           0 :       ctc = NULL;
    1070           0 :       return GNUNET_DB_STATUS_HARD_ERROR;
    1071             :     }
    1072           1 :     erc->async_cont = GNUNET_YES;
    1073           1 :     return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    1074             :   }
    1075             :   /* Check for hard failure */
    1076           0 :   if ( (GNUNET_SYSERR == ret) ||
    1077             :        (GNUNET_DB_STATUS_HARD_ERROR == qs) )
    1078             :   {
    1079           0 :     GNUNET_break (0);
    1080           0 :     global_ret = GNUNET_SYSERR;
    1081           0 :     GNUNET_SCHEDULER_shutdown ();
    1082           0 :     return GNUNET_DB_STATUS_HARD_ERROR;
    1083             :   }
    1084             :   /* Reserve balance was almost zero OR soft error */
    1085           0 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1086             :               "Reserve was virtually empty, moving on\n");
    1087           0 :   return qs;
    1088             : }
    1089             : 
    1090             : 
    1091             : /**
    1092             :  * Main work function that finds and triggers transfers for reserves
    1093             :  * closures.
    1094             :  *
    1095             :  * @param cls closure
    1096             :  */
    1097             : static void
    1098          45 : run_reserve_closures (void *cls)
    1099             : {
    1100             :   struct TALER_EXCHANGEDB_Session *session;
    1101             :   enum GNUNET_DB_QueryStatus qs;
    1102             :   const struct GNUNET_SCHEDULER_TaskContext *tc;
    1103             :   struct ExpiredReserveContext erc;
    1104             : 
    1105          45 :   task = NULL;
    1106          45 :   reserves_idle = GNUNET_NO;
    1107          45 :   tc = GNUNET_SCHEDULER_get_task_context ();
    1108          45 :   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
    1109          44 :     return;
    1110          45 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1111             :               "Checking for reserves to close\n");
    1112          45 :   if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
    1113             :   {
    1114           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1115             :                 "Failed to obtain database session!\n");
    1116           0 :     global_ret = GNUNET_SYSERR;
    1117           0 :     GNUNET_SCHEDULER_shutdown ();
    1118           0 :     return;
    1119             :   }
    1120          45 :   if (GNUNET_OK !=
    1121          45 :       db_plugin->start (db_plugin->cls,
    1122             :                         session))
    1123             :   {
    1124           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1125             :                 "Failed to start database transaction!\n");
    1126           0 :     global_ret = GNUNET_SYSERR;
    1127           0 :     GNUNET_SCHEDULER_shutdown ();
    1128           0 :     return;
    1129             :   }
    1130          45 :   erc.session = session;
    1131          45 :   erc.async_cont = GNUNET_NO;
    1132          45 :   qs = db_plugin->get_expired_reserves (db_plugin->cls,
    1133             :                                         session,
    1134             :                                         GNUNET_TIME_absolute_get (),
    1135             :                                         &expired_reserve_cb,
    1136             :                                         &erc);
    1137          45 :   switch (qs)
    1138             :   {
    1139             :   case GNUNET_DB_STATUS_HARD_ERROR:
    1140           0 :     GNUNET_break (0);
    1141           0 :     db_plugin->rollback (db_plugin->cls,
    1142             :                          session);
    1143           0 :     global_ret = GNUNET_SYSERR;
    1144           0 :     GNUNET_SCHEDULER_shutdown ();
    1145           0 :     return;
    1146             :   case GNUNET_DB_STATUS_SOFT_ERROR:
    1147           0 :     db_plugin->rollback (db_plugin->cls,
    1148             :                          session);
    1149           0 :     task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
    1150             :                                      NULL);
    1151           0 :     return;
    1152             :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    1153          44 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1154             :                 "No more idle reserves, going back to aggregation\n");
    1155          44 :     reserves_idle = GNUNET_YES;
    1156          44 :     db_plugin->rollback (db_plugin->cls,
    1157             :                          session);
    1158          44 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1159             :                                      NULL);
    1160          44 :     return;
    1161             :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    1162           1 :     if (GNUNET_YES == erc.async_cont)
    1163           1 :       break;
    1164           0 :     (void) commit_or_warn (session);
    1165           0 :     task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
    1166             :                                      NULL);
    1167           0 :     return;
    1168             :   }
    1169             : }
    1170             : 
    1171             : 
    1172             : /**
    1173             :  * Main work function that queries the DB and aggregates transactions
    1174             :  * into larger wire transfers.
    1175             :  *
    1176             :  * @param cls NULL
    1177             :  */
    1178             : static void
    1179          94 : run_aggregation (void *cls)
    1180             : {
    1181             :   static int swap;
    1182             :   struct TALER_EXCHANGEDB_Session *session;
    1183             :   enum GNUNET_DB_QueryStatus qs;
    1184             :   const struct GNUNET_SCHEDULER_TaskContext *tc;
    1185             : 
    1186          94 :   task = NULL;
    1187          94 :   tc = GNUNET_SCHEDULER_get_task_context ();
    1188          94 :   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
    1189           0 :     return;
    1190          94 :   if (0 == (++swap % 2))
    1191             :   {
    1192          33 :     task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
    1193             :                                      NULL);
    1194          33 :     return;
    1195             :   }
    1196          61 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1197             :               "Checking for ready deposits to aggregate\n");
    1198          61 :   if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
    1199             :   {
    1200           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1201             :                 "Failed to obtain database session!\n");
    1202           0 :     global_ret = GNUNET_SYSERR;
    1203           0 :     GNUNET_SCHEDULER_shutdown ();
    1204           0 :     return;
    1205             :   }
    1206          61 :   if (GNUNET_OK !=
    1207          61 :       db_plugin->start_deferred_wire_out (db_plugin->cls,
    1208             :                                           session))
    1209             :   {
    1210           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1211             :                 "Failed to start database transaction!\n");
    1212           0 :     global_ret = GNUNET_SYSERR;
    1213           0 :     GNUNET_SCHEDULER_shutdown ();
    1214           0 :     return;
    1215             :   }
    1216          61 :   au = GNUNET_new (struct AggregationUnit);
    1217          61 :   au->session = session;
    1218          61 :   qs = db_plugin->get_ready_deposit (db_plugin->cls,
    1219             :                                      session,
    1220             :                                      &deposit_cb,
    1221             :                                      au);
    1222          61 :   if (0 >= qs)
    1223             :   {
    1224          40 :     cleanup_au ();
    1225          40 :     db_plugin->rollback (db_plugin->cls,
    1226             :                          session);
    1227          40 :     if (GNUNET_DB_STATUS_HARD_ERROR == qs)
    1228             :     {
    1229           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1230             :                   "Failed to execute deposit iteration!\n");
    1231           0 :       global_ret = GNUNET_SYSERR;
    1232           0 :       GNUNET_SCHEDULER_shutdown ();
    1233           0 :       return;
    1234             :     }
    1235          40 :     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    1236             :     {
    1237             :       /* should re-try immediately */
    1238           0 :       swap--; /* do not count failed attempts */
    1239           0 :       task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1240             :                                        NULL);
    1241           0 :       return;
    1242             :     }
    1243          40 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1244             :                 "No more ready deposits, going to sleep\n");
    1245          80 :     if ( (GNUNET_YES == test_mode) &&
    1246          40 :          (swap >= 2) )
    1247             :     {
    1248             :       /* in test mode, shutdown if we end up being idle */
    1249          28 :       GNUNET_SCHEDULER_shutdown ();
    1250             :     }
    1251             :     else
    1252             :     {
    1253          12 :       if ( (GNUNET_NO == reserves_idle) ||
    1254           0 :            (GNUNET_YES == test_mode) )
    1255             :         /* Possibly more to on reserves, go for it immediately */
    1256          12 :         task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
    1257             :                                          NULL);
    1258             :       else
    1259             :         /* nothing to do, sleep for a minute and try again */
    1260           0 :         task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
    1261             :                                              &run_aggregation,
    1262             :                                              NULL);
    1263             :     }
    1264          40 :     return;
    1265             :   }
    1266             : 
    1267             :   /* Now try to find other deposits to aggregate */
    1268          21 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1269             :               "Found ready deposit for %s, aggregating\n",
    1270             :               TALER_B2S (&au->merchant_pub));
    1271          42 :   qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
    1272             :                                              session,
    1273          21 :                                              &au->h_wire,
    1274          21 :                                              &au->merchant_pub,
    1275             :                                              &aggregate_cb,
    1276             :                                              au,
    1277             :                                              aggregation_limit);
    1278          42 :   if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) ||
    1279          21 :        (GNUNET_YES == au->failed) )
    1280             :   {
    1281           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1282             :                 "Failed to execute deposit iteration!\n");
    1283           0 :     cleanup_au ();
    1284           0 :     db_plugin->rollback (db_plugin->cls,
    1285             :                          session);
    1286           0 :     global_ret = GNUNET_SYSERR;
    1287           0 :     GNUNET_SCHEDULER_shutdown ();
    1288           0 :     return;
    1289             :   }
    1290          21 :   if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    1291             :   {
    1292             :     /* serializiability issue, try again */
    1293           0 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1294             :                 "Serialization issue, trying again later!\n");
    1295           0 :     db_plugin->rollback (db_plugin->cls,
    1296             :                          session);
    1297           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1298             :                                      NULL);
    1299           0 :     return;
    1300             :   }
    1301             : 
    1302             :   /* Subtract wire transfer fee and round to the unit supported by the
    1303             :      wire transfer method; Check if after rounding down, we still have
    1304             :      an amount to transfer, and if not mark as 'tiny'. */
    1305          21 :   if ( (GNUNET_OK !=
    1306          21 :         TALER_amount_subtract (&au->final_amount,
    1307          21 :                                &au->total_amount,
    1308          38 :                                &au->wire_fee)) ||
    1309             :        (GNUNET_SYSERR ==
    1310          34 :         au->wp->wire_plugin->amount_round (au->wp->wire_plugin->cls,
    1311          34 :                                            &au->final_amount)) ||
    1312          31 :        ( (0 == au->final_amount.value) &&
    1313          14 :          (0 == au->final_amount.fraction) ) )
    1314             :   {
    1315           4 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1316             :                 "Aggregate value too low for transfer (%d/%s)\n",
    1317             :                 qs,
    1318             :                 TALER_amount2s (&au->final_amount));
    1319             :     /* Rollback ongoing transaction, as we will not use the respective
    1320             :        WTID and thus need to remove the tracking data */
    1321           4 :     db_plugin->rollback (db_plugin->cls,
    1322             :                          session);
    1323             : 
    1324             :     /* There were results, just the value was too low.  Start another
    1325             :        transaction to mark all* of the selected deposits as minor! */
    1326           4 :     if (GNUNET_OK !=
    1327           4 :         db_plugin->start (db_plugin->cls,
    1328             :                           session))
    1329             :     {
    1330           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1331             :                   "Failed to start database transaction!\n");
    1332           0 :       global_ret = GNUNET_SYSERR;
    1333           0 :       cleanup_au ();
    1334           0 :       GNUNET_SCHEDULER_shutdown ();
    1335           0 :       return;
    1336             :     }
    1337             :     /* Mark transactions by row_id as minor */
    1338           8 :     qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
    1339             :                                        session,
    1340           4 :                                        au->row_id);
    1341           4 :     if (0 <= qs)
    1342             :     {
    1343           9 :       for (unsigned int i=0;i<au->rows_offset;i++)
    1344             :       {
    1345          10 :         qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
    1346             :                                            session,
    1347           5 :                                            au->additional_rows[i]);
    1348           5 :         if (0 > qs)
    1349           0 :           break;
    1350             :       }
    1351             :     }
    1352           4 :     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    1353             :     {
    1354           0 :       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1355             :                   "Serialization issue, trying again later!\n");
    1356           0 :       db_plugin->rollback (db_plugin->cls,
    1357             :                            session);
    1358           0 :       cleanup_au ();
    1359             :       /* start again */
    1360           0 :       task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1361             :                                        NULL);
    1362           0 :       return;
    1363             :     }
    1364           4 :     if (GNUNET_DB_STATUS_HARD_ERROR == qs)
    1365             :     {
    1366           0 :       db_plugin->rollback (db_plugin->cls,
    1367             :                            session);
    1368           0 :       cleanup_au ();
    1369           0 :       GNUNET_SCHEDULER_shutdown ();
    1370           0 :       return;
    1371             :     }
    1372             :     /* commit */
    1373           4 :     (void) commit_or_warn (session);
    1374           4 :     cleanup_au ();
    1375             :     /* start again */
    1376           4 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1377             :                                      NULL);
    1378           4 :     return;
    1379             :   }
    1380             :   {
    1381             :     char *amount_s;
    1382             : 
    1383          17 :     amount_s = TALER_amount_to_string (&au->final_amount);
    1384          17 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1385             :                 "Preparing wire transfer of %s to %s\n",
    1386             :                 amount_s,
    1387             :                 TALER_B2S (&au->merchant_pub));
    1388          17 :     GNUNET_free (amount_s);
    1389             :   }
    1390          51 :   au->ph = au->wp->wire_plugin->prepare_wire_transfer (au->wp->wire_plugin->cls,
    1391          17 :                                                        au->wire,
    1392          17 :                                                        &au->final_amount,
    1393             :                                                        exchange_base_url,
    1394          17 :                                                        &au->wtid,
    1395             :                                                        &prepare_cb,
    1396             :                                                        au);
    1397          17 :   if (NULL == au->ph)
    1398             :   {
    1399           0 :     GNUNET_break (0); /* why? how to best recover? */
    1400           0 :     db_plugin->rollback (db_plugin->cls,
    1401             :                          session);
    1402           0 :     cleanup_au ();
    1403             :     /* start again */
    1404           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1405             :                                      NULL);
    1406           0 :     return;
    1407             :   }
    1408             :   /* otherwise we continue with #prepare_cb(), see below */
    1409             : }
    1410             : 
    1411             : 
    1412             : /**
    1413             :  * Function to be called with the prepared transfer data.
    1414             :  *
    1415             :  * @param cls NULL
    1416             :  * @param buf transaction data to persist, NULL on error
    1417             :  * @param buf_size number of bytes in @a buf, 0 on error
    1418             :  */
    1419             : static void
    1420          17 : prepare_cb (void *cls,
    1421             :             const char *buf,
    1422             :             size_t buf_size)
    1423             : {
    1424          17 :   struct TALER_EXCHANGEDB_Session *session = au->session;
    1425             :   enum GNUNET_DB_QueryStatus qs;
    1426             : 
    1427          17 :   GNUNET_free_non_null (au->additional_rows);
    1428          17 :   au->additional_rows = NULL;
    1429          17 :   if (NULL == buf)
    1430             :   {
    1431           0 :     GNUNET_break (0); /* why? how to best recover? */
    1432           0 :     db_plugin->rollback (db_plugin->cls,
    1433             :                          session);
    1434             :     /* start again */
    1435           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1436             :                                      NULL);
    1437           0 :     cleanup_au ();
    1438           0 :     return;
    1439             :   }
    1440             : 
    1441          17 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1442             :               "Storing %u bytes of wire prepare data\n",
    1443             :               (unsigned int) buf_size);
    1444             :   /* Commit our intention to execute the wire transfer! */
    1445          34 :   qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
    1446             :                                             session,
    1447          17 :                                             au->wp->type,
    1448             :                                             buf,
    1449             :                                             buf_size);
    1450             :   /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
    1451             :      table constraints */
    1452          17 :   if (qs >= 0)
    1453          34 :     qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
    1454             :                                              session,
    1455          17 :                                              au->execution_time,
    1456          17 :                                              &au->wtid,
    1457          17 :                                              au->wire,
    1458          17 :                                              &au->final_amount);
    1459          17 :   cleanup_au ();
    1460          17 :   if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    1461             :   {
    1462           0 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1463             :                 "Serialization issue for prepared wire data; trying again later!\n");
    1464           0 :     db_plugin->rollback (db_plugin->cls,
    1465             :                          session);
    1466             :     /* start again */
    1467           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1468             :                                      NULL);
    1469           0 :     return;
    1470             :   }
    1471          17 :   if (GNUNET_DB_STATUS_HARD_ERROR == qs)
    1472             :   {
    1473           0 :     GNUNET_break (0);
    1474           0 :     db_plugin->rollback (db_plugin->cls,
    1475             :                          session);
    1476             :     /* die hard */
    1477           0 :     global_ret = GNUNET_SYSERR;
    1478           0 :     GNUNET_SCHEDULER_shutdown ();
    1479           0 :     return;
    1480             :   }
    1481             : 
    1482          17 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1483             :               "Stored wire transfer out instructions\n");
    1484             : 
    1485             :   /* Now we can finally commit the overall transaction, as we are
    1486             :      again consistent if all of this passes. */
    1487          17 :   switch (commit_or_warn (session))
    1488             :   {
    1489             :   case GNUNET_DB_STATUS_SOFT_ERROR:
    1490             :     /* try again */
    1491           0 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
    1492             :                 "Commit issue for prepared wire data; trying again later!\n");
    1493           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1494             :                                      NULL);
    1495           0 :     return;
    1496             :   case GNUNET_DB_STATUS_HARD_ERROR:
    1497           0 :     GNUNET_break (0);
    1498           0 :     global_ret = GNUNET_SYSERR;
    1499           0 :     GNUNET_SCHEDULER_shutdown ();
    1500           0 :     return;
    1501             :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    1502          17 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1503             :                 "Preparation complete, switching to transfer mode\n");
    1504             :     /* run alternative task: actually do wire transfer! */
    1505          17 :     task = GNUNET_SCHEDULER_add_now (&run_transfers,
    1506             :                                      NULL);
    1507          17 :     return;
    1508             :   default:
    1509           0 :     GNUNET_break (0);
    1510           0 :     global_ret = GNUNET_SYSERR;
    1511           0 :     GNUNET_SCHEDULER_shutdown ();
    1512           0 :     return;
    1513             :   }
    1514             : }
    1515             : 
    1516             : 
    1517             : /**
    1518             :  * Function called with the result from the execute step.
    1519             :  *
    1520             :  * @param cls NULL
    1521             :  * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure
    1522             :  * @param serial_id unique ID of the wire transfer in the bank's records; UINT64_MAX on error
    1523             :  * @param emsg NULL on success, otherwise an error message
    1524             :  */
    1525             : static void
    1526          18 : wire_confirm_cb (void *cls,
    1527             :                  int success,
    1528             :                  uint64_t serial_id,
    1529             :                  const char *emsg)
    1530             : {
    1531          18 :   struct TALER_EXCHANGEDB_Session *session = wpd->session;
    1532             :   enum GNUNET_DB_QueryStatus qs;
    1533             : 
    1534          18 :   wpd->eh = NULL;
    1535          18 :   if (GNUNET_SYSERR == success)
    1536             :   {
    1537           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1538             :                 "Wire transaction failed: %s\n",
    1539             :                 emsg);
    1540           0 :     db_plugin->rollback (db_plugin->cls,
    1541             :                          session);
    1542           0 :     global_ret = GNUNET_SYSERR;
    1543           0 :     GNUNET_SCHEDULER_shutdown ();
    1544           0 :     GNUNET_free (wpd);
    1545           0 :     wpd = NULL;
    1546           0 :     return;
    1547             :   }
    1548          36 :   qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
    1549             :                                                    session,
    1550          18 :                                                    wpd->row_id);
    1551          18 :   if (0 >= qs)
    1552             :   {
    1553           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    1554           0 :     db_plugin->rollback (db_plugin->cls,
    1555             :                          session);
    1556           0 :     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    1557             :     {
    1558             :       /* try again */
    1559           0 :       task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1560             :                                        NULL);
    1561             :     }
    1562             :     else
    1563             :     {
    1564           0 :       global_ret = GNUNET_SYSERR;
    1565           0 :       GNUNET_SCHEDULER_shutdown ();
    1566             :     }
    1567           0 :     GNUNET_free (wpd);
    1568           0 :     wpd = NULL;
    1569           0 :     return;
    1570             :   }
    1571          18 :   GNUNET_free (wpd);
    1572          18 :   wpd = NULL;
    1573          18 :   switch (commit_or_warn (session))
    1574             :   {
    1575             :   case GNUNET_DB_STATUS_SOFT_ERROR:
    1576             :     /* try again */
    1577           0 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1578             :                                      NULL);
    1579           0 :     return;
    1580             :   case GNUNET_DB_STATUS_HARD_ERROR:
    1581           0 :     GNUNET_break (0);
    1582           0 :     global_ret = GNUNET_SYSERR;
    1583           0 :     GNUNET_SCHEDULER_shutdown ();
    1584           0 :     return;
    1585             :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    1586          18 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1587             :                 "Wire transfer complete\n");
    1588             :     /* continue with #run_transfers(), just to guard
    1589             :        against the unlikely case that there are more. */
    1590          18 :     task = GNUNET_SCHEDULER_add_now (&run_transfers,
    1591             :                                      NULL);
    1592          18 :     return;
    1593             :   default:
    1594           0 :     GNUNET_break (0);
    1595           0 :     global_ret = GNUNET_SYSERR;
    1596           0 :     GNUNET_SCHEDULER_shutdown ();
    1597           0 :     return;
    1598             :   }
    1599             : }
    1600             : 
    1601             : 
    1602             : /**
    1603             :  * Callback with data about a prepared transaction.
    1604             :  *
    1605             :  * @param cls NULL
    1606             :  * @param rowid row identifier used to mark prepared transaction as done
    1607             :  * @param wire_method wire method the preparation was done for
    1608             :  * @param buf transaction data that was persisted, NULL on error
    1609             :  * @param buf_size number of bytes in @a buf, 0 on error
    1610             :  */
    1611             : static void
    1612          18 : wire_prepare_cb (void *cls,
    1613             :                  uint64_t rowid,
    1614             :                  const char *wire_method,
    1615             :                  const char *buf,
    1616             :                  size_t buf_size)
    1617             : {
    1618          18 :   wpd->row_id = rowid;
    1619          18 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1620             :               "Starting wire transfer %llu\n",
    1621             :               (unsigned long long) rowid);
    1622          18 :   wpd->wp = find_plugin (wire_method);
    1623          18 :   if (NULL == wpd->wp)
    1624             :   {
    1625             :     /* Should really never happen here, as when we get
    1626             :        here the plugin should be in the cache. */
    1627           0 :     GNUNET_break (0);
    1628           0 :     db_plugin->rollback (db_plugin->cls,
    1629           0 :                          wpd->session);
    1630           0 :     global_ret = GNUNET_SYSERR;
    1631           0 :     GNUNET_SCHEDULER_shutdown ();
    1632           0 :     GNUNET_free (wpd);
    1633           0 :     wpd = NULL;
    1634           0 :     return;
    1635             :   }
    1636          18 :   wpd->eh = wpd->wp->wire_plugin->execute_wire_transfer (wpd->wp->wire_plugin->cls,
    1637             :                                                          buf,
    1638             :                                                          buf_size,
    1639             :                                                          &wire_confirm_cb,
    1640             :                                                          NULL);
    1641          18 :   if (NULL == wpd->eh)
    1642             :   {
    1643           0 :     GNUNET_break (0); /* why? how to best recover? */
    1644           0 :     db_plugin->rollback (db_plugin->cls,
    1645           0 :                          wpd->session);
    1646           0 :     global_ret = GNUNET_SYSERR;
    1647           0 :     GNUNET_SCHEDULER_shutdown ();
    1648           0 :     GNUNET_free (wpd);
    1649           0 :     wpd = NULL;
    1650           0 :     return;
    1651             :   }
    1652             : }
    1653             : 
    1654             : 
    1655             : /**
    1656             :  * Execute the wire transfers that we have committed to
    1657             :  * do.
    1658             :  *
    1659             :  * @param cls NULL
    1660             :  */
    1661             : static void
    1662          64 : run_transfers (void *cls)
    1663             : {
    1664             :   enum GNUNET_DB_QueryStatus qs;
    1665             :   struct TALER_EXCHANGEDB_Session *session;
    1666             :   const struct GNUNET_SCHEDULER_TaskContext *tc;
    1667             : 
    1668          64 :   task = NULL;
    1669          64 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1670             :               "Checking for pending wire transfers\n");
    1671          64 :   tc = GNUNET_SCHEDULER_get_task_context ();
    1672          64 :   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
    1673           0 :     return;
    1674          64 :   if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
    1675             :   {
    1676           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1677             :                 "Failed to obtain database session!\n");
    1678           0 :     global_ret = GNUNET_SYSERR;
    1679           0 :     GNUNET_SCHEDULER_shutdown ();
    1680           0 :     return;
    1681             :   }
    1682          64 :   if (GNUNET_OK !=
    1683          64 :       db_plugin->start (db_plugin->cls,
    1684             :                         session))
    1685             :   {
    1686           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    1687             :                 "Failed to start database transaction!\n");
    1688           0 :     global_ret = GNUNET_SYSERR;
    1689           0 :     GNUNET_SCHEDULER_shutdown ();
    1690           0 :     return;
    1691             :   }
    1692          64 :   wpd = GNUNET_new (struct WirePrepareData);
    1693          64 :   wpd->session = session;
    1694          64 :   qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
    1695             :                                          session,
    1696             :                                          &wire_prepare_cb,
    1697             :                                          NULL);
    1698          64 :   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
    1699          18 :     return;  /* continues in #wire_prepare_cb() */
    1700          46 :   db_plugin->rollback (db_plugin->cls,
    1701             :                        session);
    1702          46 :   GNUNET_free (wpd);
    1703          46 :   wpd = NULL;
    1704          46 :   switch (qs)
    1705             :   {
    1706             :   case GNUNET_DB_STATUS_HARD_ERROR:
    1707           0 :     GNUNET_break (0);
    1708           0 :     global_ret = GNUNET_SYSERR;
    1709           0 :     GNUNET_SCHEDULER_shutdown ();
    1710           0 :     return;
    1711             :   case GNUNET_DB_STATUS_SOFT_ERROR:
    1712             :     /* try again */
    1713           0 :     task = GNUNET_SCHEDULER_add_now (&run_transfers,
    1714             :                                      NULL);
    1715           0 :     return;
    1716             :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    1717             :     /* no more prepared wire transfers, go back to aggregation! */
    1718          46 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1719             :                 "No more pending wire transfers, starting aggregation\n");
    1720          46 :     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
    1721             :                                      NULL);
    1722          46 :     return;
    1723             :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    1724             :     /* should be impossible */
    1725           0 :     GNUNET_assert (0);
    1726             :   }
    1727             : }
    1728             : 
    1729             : 
    1730             : /**
    1731             :  * First task.
    1732             :  *
    1733             :  * @param cls closure, NULL
    1734             :  * @param args remaining command-line arguments
    1735             :  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
    1736             :  * @param c configuration
    1737             :  */
    1738             : static void
    1739          28 : run (void *cls,
    1740             :      char *const *args,
    1741             :      const char *cfgfile,
    1742             :      const struct GNUNET_CONFIGURATION_Handle *c)
    1743             : {
    1744          28 :   if (GNUNET_OK !=
    1745          28 :       GNUNET_CONFIGURATION_get_value_string (c,
    1746             :                                              "exchange",
    1747             :                                              "BASE_URL",
    1748             :                                              &exchange_base_url))
    1749             :   {
    1750           0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
    1751             :                                "exchange",
    1752             :                                "BASE_URL");
    1753           0 :     global_ret = 1;
    1754           0 :     return;
    1755             :   }
    1756          28 :   cfg = GNUNET_CONFIGURATION_dup (c);
    1757          28 :   if (GNUNET_OK != exchange_serve_process_config ())
    1758             :   {
    1759           0 :     GNUNET_CONFIGURATION_destroy (cfg);
    1760           0 :     cfg = NULL;
    1761           0 :     global_ret = 1;
    1762           0 :     return;
    1763             :   }
    1764          28 :   task = GNUNET_SCHEDULER_add_now (&run_transfers,
    1765             :                                    NULL);
    1766          28 :   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
    1767             :                                  cls);
    1768             : }
    1769             : 
    1770             : 
    1771             : /**
    1772             :  * The main function of the taler-exchange-aggregator.
    1773             :  *
    1774             :  * @param argc number of arguments from the command line
    1775             :  * @param argv command line arguments
    1776             :  * @return 0 ok, 1 on error
    1777             :  */
    1778             : int
    1779          28 : main (int argc,
    1780             :       char *const *argv)
    1781             : {
    1782          28 :   struct GNUNET_GETOPT_CommandLineOption options[] = {
    1783             :     GNUNET_GETOPT_option_flag ('t',
    1784             :                                "test",
    1785             :                                "run in test mode and exit when idle",
    1786             :                                &test_mode),
    1787             :     GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
    1788             :     GNUNET_GETOPT_OPTION_END
    1789             :   };
    1790             : 
    1791          28 :   if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv,
    1792             :                                                  &argc, &argv))
    1793           0 :     return 2;
    1794          28 :   if (GNUNET_OK !=
    1795          28 :       GNUNET_PROGRAM_run (argc, argv,
    1796             :                           "taler-exchange-aggregator",
    1797             :                           gettext_noop ("background process that aggregates and executes wire transfers to merchants"),
    1798             :                           options,
    1799             :                           &run, NULL))
    1800             :   {
    1801           0 :     GNUNET_free ((void*) argv);
    1802           0 :     return 1;
    1803             :   }
    1804          28 :   GNUNET_free ((void*) argv);
    1805          28 :   return global_ret;
    1806             : }
    1807             : 
    1808             : /* end of taler-exchange-aggregator.c */

Generated by: LCOV version 1.13