LCOV - code coverage report
Current view: top level - exchange - taler-exchange-transfer.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 156 318 49.1 %
Date: 2025-07-09 07:38:29 Functions: 11 12 91.7 %

          Line data    Source code
       1             : /*
       2             :   This file is part of TALER
       3             :   Copyright (C) 2016-2021 Taler Systems SA
       4             : 
       5             :   TALER is free software; you can redistribute it and/or modify it under the
       6             :   terms of the GNU Affero General Public License as published by the Free Software
       7             :   Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
      12             : 
      13             :   You should have received a copy of the GNU Affero General Public License along with
      14             :   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             : */
      16             : /**
      17             :  * @file taler-exchange-transfer.c
      18             :  * @brief Process that actually finalizes outgoing transfers with the wire gateway / bank
      19             :  * @author Christian Grothoff
      20             :  */
      21             : #include "taler/platform.h"
      22             : #include <gnunet/gnunet_util_lib.h>
      23             : #include <jansson.h>
      24             : #include <pthread.h>
      25             : #include "taler/taler_exchangedb_lib.h"
      26             : #include "taler/taler_exchangedb_plugin.h"
      27             : #include "taler/taler_json_lib.h"
      28             : #include "taler/taler_bank_service.h"
      29             : 
      30             : /**
      31             :  * What is the default batch size we use for credit history
      32             :  * requests with the bank.  See `batch_size` below.
      33             :  */
      34             : #define DEFAULT_BATCH_SIZE 32
      35             : 
      36             : /**
      37             :  * How often will we retry a request (given certain
      38             :  * HTTP status codes) before giving up?
      39             :  */
      40             : #define MAX_RETRIES 3
      41             : 
      42             : /**
      43             :  * Information about our work shard.
      44             :  */
      45             : struct Shard
      46             : {
      47             : 
      48             :   /**
      49             :    * Time when we started to work on this shard.
      50             :    */
      51             :   struct GNUNET_TIME_Absolute shard_start_time;
      52             : 
      53             :   /**
      54             :    * Offset the shard begins at.
      55             :    */
      56             :   uint64_t shard_start;
      57             : 
      58             :   /**
      59             :    * Exclusive offset where the shard ends.
      60             :    */
      61             :   uint64_t shard_end;
      62             : 
      63             :   /**
      64             :    * Offset where our current batch begins.
      65             :    */
      66             :   uint64_t batch_start;
      67             : 
      68             :   /**
      69             :    * Highest row processed in the current batch.
      70             :    */
      71             :   uint64_t batch_end;
      72             : 
      73             : };
      74             : 
      75             : 
      76             : /**
      77             :  * Data we keep to #run_transfers().  There is at most
      78             :  * one of these around at any given point in time.
      79             :  * Note that this limits parallelism, and we might want
      80             :  * to revise this decision at a later point.
      81             :  */
      82             : struct WirePrepareData
      83             : {
      84             : 
      85             :   /**
      86             :    * All transfers done in the same transaction
      87             :    * are kept in a DLL.
      88             :    */
      89             :   struct WirePrepareData *next;
      90             : 
      91             :   /**
      92             :    * All transfers done in the same transaction
      93             :    * are kept in a DLL.
      94             :    */
      95             :   struct WirePrepareData *prev;
      96             : 
      97             :   /**
      98             :    * Wire execution handle.
      99             :    */
     100             :   struct TALER_BANK_TransferHandle *eh;
     101             : 
     102             :   /**
     103             :    * Wire account used for this preparation.
     104             :    */
     105             :   const struct TALER_EXCHANGEDB_AccountInfo *wa;
     106             : 
     107             :   /**
     108             :    * Row ID of the transfer.
     109             :    */
     110             :   unsigned long long row_id;
     111             : 
     112             :   /**
     113             :    * Number of bytes allocated after this struct
     114             :    * with the prewire data.
     115             :    */
     116             :   size_t buf_size;
     117             : 
     118             :   /**
     119             :    * How often did we retry so far?
     120             :    */
     121             :   unsigned int retries;
     122             : 
     123             : };
     124             : 
     125             : 
     126             : /**
     127             :  * The exchange's configuration.
     128             :  */
     129             : static const struct GNUNET_CONFIGURATION_Handle *cfg;
     130             : 
     131             : /**
     132             :  * Our database plugin.
     133             :  */
     134             : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
     135             : 
     136             : /**
     137             :  * Next task to run, if any.
     138             :  */
     139             : static struct GNUNET_SCHEDULER_Task *task;
     140             : 
     141             : /**
     142             :  * If we are currently executing transfers, information about
     143             :  * the active transfers is here. Otherwise, this variable is NULL.
     144             :  */
     145             : static struct WirePrepareData *wpd_head;
     146             : 
     147             : /**
     148             :  * If we are currently executing transfers, information about
     149             :  * the active transfers is here. Otherwise, this variable is NULL.
     150             :  */
     151             : static struct WirePrepareData *wpd_tail;
     152             : 
     153             : /**
     154             :  * Information about our work shard.
     155             :  */
     156             : static struct Shard *shard;
     157             : 
     158             : /**
     159             :  * Handle to the context for interacting with the bank / wire gateway.
     160             :  */
     161             : static struct GNUNET_CURL_Context *ctx;
     162             : 
     163             : /**
     164             :  * Randomized back-off we use on serialization errors.
     165             :  */
     166             : static struct GNUNET_TIME_Relative serialization_delay;
     167             : 
     168             : /**
     169             :  * Scheduler context for running the @e ctx.
     170             :  */
     171             : static struct GNUNET_CURL_RescheduleContext *rc;
     172             : 
     173             : /**
     174             :  * Value to return from main(). 0 on success, non-zero on errors.
     175             :  */
     176             : static int global_ret;
     177             : 
     178             : /**
     179             :  * #GNUNET_YES if we are in test mode and should exit when idle.
     180             :  */
     181             : static int test_mode;
     182             : 
     183             : /**
     184             :  * How long should we sleep when idle before trying to find more work?
     185             :  * Also used for how long we wait to grab a shard before trying it again.
     186             :  * The value should be set to a bit above the average time it takes to
     187             :  * process a shard.
     188             :  */
     189             : static struct GNUNET_TIME_Relative transfer_idle_sleep_interval;
     190             : 
     191             : /**
     192             :  * How long did we take to finish the last shard?
     193             :  */
     194             : static struct GNUNET_TIME_Relative shard_delay;
     195             : 
     196             : /**
     197             :  * Size of the shards.
     198             :  */
     199             : static unsigned int shard_size = DEFAULT_BATCH_SIZE;
     200             : 
     201             : /**
     202             :  * How many workers should we plan our scheduling with?
     203             :  */
     204             : static unsigned int max_workers = 0;
     205             : 
     206             : 
     207             : /**
     208             :  * Clean up all active bank interactions.
     209             :  */
     210             : static void
     211          60 : cleanup_wpd (void)
     212             : {
     213             :   struct WirePrepareData *wpd;
     214             : 
     215          60 :   while (NULL != (wpd = wpd_head))
     216             :   {
     217           0 :     GNUNET_CONTAINER_DLL_remove (wpd_head,
     218             :                                  wpd_tail,
     219             :                                  wpd);
     220           0 :     if (NULL != wpd->eh)
     221             :     {
     222           0 :       TALER_BANK_transfer_cancel (wpd->eh);
     223           0 :       wpd->eh = NULL;
     224             :     }
     225           0 :     GNUNET_free (wpd);
     226             :   }
     227          60 : }
     228             : 
     229             : 
     230             : /**
     231             :  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
     232             :  *
     233             :  * @param cls closure
     234             :  */
     235             : static void
     236          60 : shutdown_task (void *cls)
     237             : {
     238             :   (void) cls;
     239          60 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     240             :               "Running shutdown\n");
     241          60 :   if (NULL != task)
     242             :   {
     243           0 :     GNUNET_SCHEDULER_cancel (task);
     244           0 :     task = NULL;
     245             :   }
     246          60 :   cleanup_wpd ();
     247          60 :   GNUNET_free (shard);
     248          60 :   db_plugin->rollback (db_plugin->cls); /* just in case */
     249          60 :   TALER_EXCHANGEDB_plugin_unload (db_plugin);
     250          60 :   db_plugin = NULL;
     251          60 :   TALER_EXCHANGEDB_unload_accounts ();
     252          60 :   cfg = NULL;
     253          60 :   if (NULL != ctx)
     254             :   {
     255          60 :     GNUNET_CURL_fini (ctx);
     256          60 :     ctx = NULL;
     257             :   }
     258          60 :   if (NULL != rc)
     259             :   {
     260          60 :     GNUNET_CURL_gnunet_rc_destroy (rc);
     261          60 :     rc = NULL;
     262             :   }
     263          60 : }
     264             : 
     265             : 
     266             : /**
     267             :  * Parse the configuration for taler-exchange-transfer.
     268             :  *
     269             :  * @return #GNUNET_OK on success
     270             :  */
     271             : static enum GNUNET_GenericReturnValue
     272          60 : parse_transfer_config (void)
     273             : {
     274          60 :   if (GNUNET_OK !=
     275          60 :       GNUNET_CONFIGURATION_get_value_time (cfg,
     276             :                                            "exchange",
     277             :                                            "TRANSFER_IDLE_SLEEP_INTERVAL",
     278             :                                            &transfer_idle_sleep_interval))
     279             :   {
     280           0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     281             :                                "exchange",
     282             :                                "TRANSFER_IDLE_SLEEP_INTERVAL");
     283           0 :     return GNUNET_SYSERR;
     284             :   }
     285          60 :   if (NULL ==
     286          60 :       (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg,
     287             :                                                  false)))
     288             :   {
     289           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     290             :                 "Failed to initialize DB subsystem\n");
     291           0 :     return GNUNET_SYSERR;
     292             :   }
     293          60 :   if (GNUNET_OK !=
     294          60 :       TALER_EXCHANGEDB_load_accounts (cfg,
     295             :                                       TALER_EXCHANGEDB_ALO_DEBIT
     296             :                                       | TALER_EXCHANGEDB_ALO_AUTHDATA))
     297             :   {
     298           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     299             :                 "No wire accounts configured for debit!\n");
     300           0 :     TALER_EXCHANGEDB_plugin_unload (db_plugin);
     301           0 :     db_plugin = NULL;
     302           0 :     return GNUNET_SYSERR;
     303             :   }
     304          60 :   return GNUNET_OK;
     305             : }
     306             : 
     307             : 
     308             : /**
     309             :  * Perform a database commit. If it fails, print a warning.
     310             :  *
     311             :  * @return status of commit
     312             :  */
     313             : static enum GNUNET_DB_QueryStatus
     314          74 : commit_or_warn (void)
     315             : {
     316             :   enum GNUNET_DB_QueryStatus qs;
     317             : 
     318          74 :   qs = db_plugin->commit (db_plugin->cls);
     319          74 :   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
     320             :   {
     321          74 :     serialization_delay = GNUNET_TIME_UNIT_ZERO;
     322          74 :     return qs;
     323             :   }
     324           0 :   GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
     325             :               ? GNUNET_ERROR_TYPE_INFO
     326             :               : GNUNET_ERROR_TYPE_ERROR,
     327             :               "Failed to commit database transaction!\n");
     328           0 :   return qs;
     329             : }
     330             : 
     331             : 
     332             : /**
     333             :  * Execute the wire transfers that we have committed to
     334             :  * do.
     335             :  *
     336             :  * @param cls NULL
     337             :  */
     338             : static void
     339             : run_transfers (void *cls);
     340             : 
     341             : 
     342             : static void
     343           0 : run_transfers_delayed (void *cls)
     344             : {
     345             :   (void) cls;
     346           0 :   shard->shard_start_time = GNUNET_TIME_absolute_get ();
     347           0 :   run_transfers (NULL);
     348           0 : }
     349             : 
     350             : 
     351             : /**
     352             :  * Select shard to process.
     353             :  *
     354             :  * @param cls NULL
     355             :  */
     356             : static void
     357             : select_shard (void *cls);
     358             : 
     359             : 
     360             : /**
     361             :  * We are done with the current batch.  Commit
     362             :  * and move on.
     363             :  */
     364             : static void
     365          74 : batch_done (void)
     366             : {
     367             :   /* batch done */
     368          74 :   GNUNET_assert (NULL == wpd_head);
     369          74 :   switch (commit_or_warn ())
     370             :   {
     371           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     372             :     /* try again */
     373           0 :     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     374             :                 "Serialization failure, trying again immediately!\n");
     375           0 :     GNUNET_assert (NULL == task);
     376           0 :     task = GNUNET_SCHEDULER_add_now (&run_transfers,
     377             :                                      NULL);
     378           0 :     return;
     379           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     380           0 :     GNUNET_break (0);
     381           0 :     global_ret = EXIT_FAILURE;
     382           0 :     GNUNET_SCHEDULER_shutdown ();
     383           0 :     return;
     384          74 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     385          74 :     shard->batch_start = shard->batch_end + 1;
     386          74 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     387             :                 "Batch complete\n");
     388             :     /* continue with #run_transfers(), just to guard
     389             :        against the unlikely case that there are more. */
     390          74 :     GNUNET_assert (NULL == task);
     391          74 :     task = GNUNET_SCHEDULER_add_now (&run_transfers,
     392             :                                      NULL);
     393          74 :     return;
     394           0 :   default:
     395           0 :     GNUNET_break (0);
     396           0 :     global_ret = EXIT_FAILURE;
     397           0 :     GNUNET_SCHEDULER_shutdown ();
     398           0 :     return;
     399             :   }
     400             : }
     401             : 
     402             : 
     403             : /**
     404             :  * Function called with the result from the execute step.
     405             :  * On success, we mark the respective wire transfer as finished,
     406             :  * and in general we afterwards continue to #run_transfers(),
     407             :  * except for irrecoverable errors.
     408             :  *
     409             :  * @param cls `struct WirePrepareData` we are working on
     410             :  * @param tr transfer response
     411             :  */
     412             : static void
     413          67 : wire_confirm_cb (void *cls,
     414             :                  const struct TALER_BANK_TransferResponse *tr)
     415             : {
     416          67 :   struct WirePrepareData *wpd = cls;
     417             :   enum GNUNET_DB_QueryStatus qs;
     418             : 
     419          67 :   wpd->eh = NULL;
     420          67 :   switch (tr->http_status)
     421             :   {
     422          67 :   case MHD_HTTP_OK:
     423          67 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     424             :                 "Wire transfer %llu completed successfully\n",
     425             :                 (unsigned long long) wpd->row_id);
     426          67 :     qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
     427          67 :                                                      wpd->row_id);
     428             :     /* continued below */
     429          67 :     break;
     430           0 :   case MHD_HTTP_NOT_FOUND:
     431             :   case MHD_HTTP_CONFLICT:
     432           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     433             :                 "Wire transaction %llu failed: %u/%d\n",
     434             :                 (unsigned long long) wpd->row_id,
     435             :                 tr->http_status,
     436             :                 tr->ec);
     437           0 :     qs = db_plugin->wire_prepare_data_mark_failed (db_plugin->cls,
     438           0 :                                                    wpd->row_id);
     439             :     /* continued below */
     440           0 :     break;
     441           0 :   case 0:
     442             :   case MHD_HTTP_TOO_MANY_REQUESTS:
     443             :   case MHD_HTTP_INTERNAL_SERVER_ERROR:
     444             :   case MHD_HTTP_BAD_GATEWAY:
     445             :   case MHD_HTTP_SERVICE_UNAVAILABLE:
     446             :   case MHD_HTTP_GATEWAY_TIMEOUT:
     447           0 :     wpd->retries++;
     448           0 :     if (wpd->retries < MAX_RETRIES)
     449             :     {
     450           0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     451             :                   "Wire transfer %llu failed (%u), trying again\n",
     452             :                   (unsigned long long) wpd->row_id,
     453             :                   tr->http_status);
     454           0 :       wpd->eh = TALER_BANK_transfer (ctx,
     455           0 :                                      wpd->wa->auth,
     456           0 :                                      &wpd[1],
     457             :                                      wpd->buf_size,
     458             :                                      &wire_confirm_cb,
     459             :                                      wpd);
     460           0 :       return;
     461             :     }
     462           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     463             :                 "Wire transaction %llu failed: %u/%d\n",
     464             :                 (unsigned long long) wpd->row_id,
     465             :                 tr->http_status,
     466             :                 tr->ec);
     467           0 :     cleanup_wpd ();
     468           0 :     db_plugin->rollback (db_plugin->cls);
     469           0 :     global_ret = EXIT_FAILURE;
     470           0 :     GNUNET_SCHEDULER_shutdown ();
     471           0 :     return;
     472           0 :   default:
     473           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     474             :                 "Wire transfer %llu failed: %u/%d\n",
     475             :                 (unsigned long long) wpd->row_id,
     476             :                 tr->http_status,
     477             :                 tr->ec);
     478           0 :     db_plugin->rollback (db_plugin->cls);
     479           0 :     cleanup_wpd ();
     480           0 :     global_ret = EXIT_FAILURE;
     481           0 :     GNUNET_SCHEDULER_shutdown ();
     482           0 :     return;
     483             :   }
     484          67 :   shard->batch_end = GNUNET_MAX (wpd->row_id,
     485             :                                  shard->batch_end);
     486          67 :   switch (qs)
     487             :   {
     488           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     489           0 :     db_plugin->rollback (db_plugin->cls);
     490           0 :     cleanup_wpd ();
     491           0 :     GNUNET_assert (NULL == task);
     492           0 :     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     493             :                 "Serialization failure, trying again immediately!\n");
     494           0 :     task = GNUNET_SCHEDULER_add_now (&run_transfers,
     495             :                                      NULL);
     496           0 :     return;
     497           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     498           0 :     db_plugin->rollback (db_plugin->cls);
     499           0 :     cleanup_wpd ();
     500           0 :     global_ret = EXIT_FAILURE;
     501           0 :     GNUNET_SCHEDULER_shutdown ();
     502           0 :     return;
     503          67 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     504             :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     505          67 :     GNUNET_CONTAINER_DLL_remove (wpd_head,
     506             :                                  wpd_tail,
     507             :                                  wpd);
     508          67 :     GNUNET_free (wpd);
     509          67 :     break;
     510             :   }
     511          67 :   if (NULL != wpd_head)
     512           2 :     return; /* wait for other queries to complete */
     513          65 :   batch_done ();
     514             : }
     515             : 
     516             : 
     517             : /**
     518             :  * Callback with data about a prepared transaction.  Triggers the respective
     519             :  * wire transfer using the prepared transaction data.
     520             :  *
     521             :  * @param cls NULL
     522             :  * @param rowid row identifier used to mark prepared transaction as done
     523             :  * @param wire_method wire method the preparation was done for
     524             :  * @param buf transaction data that was persisted, NULL on error
     525             :  * @param buf_size number of bytes in @a buf, 0 on error
     526             :  */
     527             : static void
     528          76 : wire_prepare_cb (void *cls,
     529             :                  uint64_t rowid,
     530             :                  const char *wire_method,
     531             :                  const char *buf,
     532             :                  size_t buf_size)
     533             : {
     534             :   struct WirePrepareData *wpd;
     535             : 
     536             :   (void) cls;
     537          76 :   if ( (NULL != task) ||
     538          76 :        (EXIT_SUCCESS != global_ret) )
     539           0 :     return; /* current transaction was aborted */
     540          76 :   if (rowid >= shard->shard_end)
     541             :   {
     542             :     /* skip */
     543           9 :     shard->batch_end = shard->shard_end - 1;
     544           9 :     if (NULL != wpd_head)
     545           0 :       return;
     546           9 :     batch_done ();
     547           9 :     return;
     548             :   }
     549          67 :   if ( (NULL == wire_method) ||
     550             :        (NULL == buf) )
     551             :   {
     552           0 :     GNUNET_break (0);
     553           0 :     db_plugin->rollback (db_plugin->cls);
     554           0 :     global_ret = EXIT_FAILURE;
     555           0 :     GNUNET_SCHEDULER_shutdown ();
     556           0 :     return;
     557             :   }
     558          67 :   wpd = GNUNET_malloc (sizeof (struct WirePrepareData)
     559             :                        + buf_size);
     560          67 :   GNUNET_memcpy (&wpd[1],
     561             :                  buf,
     562             :                  buf_size);
     563          67 :   wpd->buf_size = buf_size;
     564          67 :   wpd->row_id = rowid;
     565          67 :   GNUNET_CONTAINER_DLL_insert (wpd_head,
     566             :                                wpd_tail,
     567             :                                wpd);
     568          67 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     569             :               "Starting wire transfer %llu\n",
     570             :               (unsigned long long) rowid);
     571          67 :   wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method);
     572          67 :   if (NULL == wpd->wa)
     573             :   {
     574             :     /* Should really never happen here, as when we get
     575             :        here the wire account should be in the cache. */
     576           0 :     GNUNET_break (0);
     577           0 :     cleanup_wpd ();
     578           0 :     db_plugin->rollback (db_plugin->cls);
     579           0 :     global_ret = EXIT_NO_RESTART;
     580           0 :     GNUNET_SCHEDULER_shutdown ();
     581           0 :     return;
     582             :   }
     583         134 :   wpd->eh = TALER_BANK_transfer (ctx,
     584          67 :                                  wpd->wa->auth,
     585             :                                  buf,
     586             :                                  buf_size,
     587             :                                  &wire_confirm_cb,
     588             :                                  wpd);
     589          67 :   if (NULL == wpd->eh)
     590             :   {
     591           0 :     GNUNET_break (0); /* Irrecoverable */
     592           0 :     cleanup_wpd ();
     593           0 :     db_plugin->rollback (db_plugin->cls);
     594           0 :     global_ret = EXIT_FAILURE;
     595           0 :     GNUNET_SCHEDULER_shutdown ();
     596           0 :     return;
     597             :   }
     598             : }
     599             : 
     600             : 
     601             : /**
     602             :  * Execute the wire transfers that we have committed to
     603             :  * do.
     604             :  *
     605             :  * @param cls NULL
     606             :  */
     607             : static void
     608         195 : run_transfers (void *cls)
     609             : {
     610             :   enum GNUNET_DB_QueryStatus qs;
     611             :   int64_t limit;
     612             : 
     613             :   (void) cls;
     614         195 :   task = NULL;
     615         195 :   limit = shard->shard_end - shard->batch_start;
     616         195 :   if (0 >= limit)
     617             :   {
     618          61 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     619             :                 "Shard [%llu,%llu) completed\n",
     620             :                 (unsigned long long) shard->shard_start,
     621             :                 (unsigned long long) shard->batch_end);
     622          61 :     qs = db_plugin->complete_shard (db_plugin->cls,
     623             :                                     "transfer",
     624          61 :                                     shard->shard_start,
     625          61 :                                     shard->batch_end + 1);
     626          61 :     switch (qs)
     627             :     {
     628           0 :     case GNUNET_DB_STATUS_HARD_ERROR:
     629           0 :       GNUNET_break (0);
     630           0 :       GNUNET_free (shard);
     631           0 :       GNUNET_SCHEDULER_shutdown ();
     632           0 :       return;
     633           0 :     case GNUNET_DB_STATUS_SOFT_ERROR:
     634           0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     635             :                   "Got DB soft error for complete_shard. Rolling back.\n");
     636           0 :       GNUNET_free (shard);
     637           0 :       GNUNET_assert (NULL == task);
     638           0 :       task = GNUNET_SCHEDULER_add_now (&select_shard,
     639             :                                        NULL);
     640           0 :       return;
     641           0 :     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     642             :       /* already existed, ok, let's just continue */
     643           0 :       break;
     644          61 :     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     645             :       /* normal case */
     646          61 :       break;
     647             :     }
     648          61 :     shard_delay = GNUNET_TIME_absolute_get_duration (
     649          61 :       shard->shard_start_time);
     650          61 :     GNUNET_free (shard);
     651          61 :     GNUNET_assert (NULL == task);
     652          61 :     task = GNUNET_SCHEDULER_add_now (&select_shard,
     653             :                                      NULL);
     654          61 :     return;
     655             :   }
     656             :   /* cap number of parallel connections to a reasonable
     657             :      limit for concurrent requests to the bank */
     658         134 :   limit = GNUNET_MIN (limit,
     659             :                       256);
     660         134 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     661             :               "Checking for %lld pending wire transfers [%llu-...)\n",
     662             :               (long long) limit,
     663             :               (unsigned long long) shard->batch_start);
     664         134 :   if (GNUNET_OK !=
     665         134 :       db_plugin->start_read_committed (db_plugin->cls,
     666             :                                        "aggregator run transfer"))
     667             :   {
     668           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     669             :                 "Failed to start database transaction!\n");
     670           0 :     global_ret = EXIT_FAILURE;
     671           0 :     GNUNET_SCHEDULER_shutdown ();
     672           0 :     return;
     673             :   }
     674         134 :   GNUNET_assert (NULL == task);
     675         134 :   qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
     676         134 :                                          shard->batch_start,
     677             :                                          limit,
     678             :                                          &wire_prepare_cb,
     679             :                                          NULL);
     680         134 :   switch (qs)
     681             :   {
     682           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     683           0 :     cleanup_wpd ();
     684           0 :     db_plugin->rollback (db_plugin->cls);
     685           0 :     GNUNET_break (0);
     686           0 :     global_ret = EXIT_FAILURE;
     687           0 :     GNUNET_SCHEDULER_shutdown ();
     688           0 :     return;
     689           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     690             :     /* try again */
     691           0 :     db_plugin->rollback (db_plugin->cls);
     692           0 :     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     693             :                 "Serialization failure, trying again immediately!\n");
     694           0 :     cleanup_wpd ();
     695           0 :     GNUNET_assert (NULL == task);
     696           0 :     task = GNUNET_SCHEDULER_add_now (&run_transfers,
     697             :                                      NULL);
     698           0 :     return;
     699          60 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     700             :     /* no more prepared wire transfers, go sleep a bit! */
     701          60 :     db_plugin->rollback (db_plugin->cls);
     702          60 :     GNUNET_assert (NULL == wpd_head);
     703          60 :     GNUNET_assert (NULL == task);
     704          60 :     if (GNUNET_YES == test_mode)
     705             :     {
     706          60 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     707             :                   "No more pending wire transfers, shutting down (because we are in test mode)\n");
     708          60 :       GNUNET_SCHEDULER_shutdown ();
     709             :     }
     710             :     else
     711             :     {
     712           0 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     713             :                   "No more pending wire transfers, going idle\n");
     714           0 :       GNUNET_assert (NULL == task);
     715           0 :       task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
     716             :                                            &run_transfers_delayed,
     717             :                                            NULL);
     718             :     }
     719          60 :     return;
     720          74 :   default:
     721             :     /* continued in wire_prepare_cb() */
     722          74 :     return;
     723             :   }
     724             : }
     725             : 
     726             : 
     727             : /**
     728             :  * Select shard to process.
     729             :  *
     730             :  * @param cls NULL
     731             :  */
     732             : static void
     733         121 : select_shard (void *cls)
     734             : {
     735             :   enum GNUNET_DB_QueryStatus qs;
     736             :   struct GNUNET_TIME_Relative delay;
     737             :   uint64_t start;
     738             :   uint64_t end;
     739             : 
     740             :   (void) cls;
     741         121 :   task = NULL;
     742         121 :   GNUNET_assert (NULL == wpd_head);
     743         121 :   if (GNUNET_SYSERR ==
     744         121 :       db_plugin->preflight (db_plugin->cls))
     745             :   {
     746           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     747             :                 "Failed to obtain database connection!\n");
     748           0 :     global_ret = EXIT_FAILURE;
     749           0 :     GNUNET_SCHEDULER_shutdown ();
     750           0 :     return;
     751             :   }
     752         121 :   if (0 == max_workers)
     753         121 :     delay = GNUNET_TIME_UNIT_ZERO;
     754             :   else
     755           0 :     delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
     756             :       GNUNET_CRYPTO_QUALITY_WEAK,
     757           0 :       4 * GNUNET_TIME_relative_max (
     758             :         transfer_idle_sleep_interval,
     759             :         GNUNET_TIME_relative_multiply (shard_delay,
     760           0 :                                        max_workers)).rel_value_us);
     761         121 :   qs = db_plugin->begin_shard (db_plugin->cls,
     762             :                                "transfer",
     763             :                                delay,
     764             :                                shard_size,
     765             :                                &start,
     766             :                                &end);
     767         121 :   switch (qs)
     768             :   {
     769           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     770           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     771             :                 "Failed to obtain starting point for monitoring from database!\n");
     772           0 :     global_ret = EXIT_FAILURE;
     773           0 :     GNUNET_SCHEDULER_shutdown ();
     774           0 :     return;
     775           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     776             :     /* try again */
     777             :     {
     778           0 :       serialization_delay = GNUNET_TIME_randomized_backoff (serialization_delay,
     779             :                                                             GNUNET_TIME_UNIT_SECONDS);
     780           0 :       GNUNET_assert (NULL == task);
     781           0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     782             :                   "Serialization failure, trying again in %s!\n",
     783             :                   GNUNET_TIME_relative2s (serialization_delay,
     784             :                                           true));
     785           0 :       task = GNUNET_SCHEDULER_add_delayed (serialization_delay,
     786             :                                            &select_shard,
     787             :                                            NULL);
     788             :     }
     789           0 :     return;
     790           0 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     791           0 :     GNUNET_break (0);
     792           0 :     GNUNET_assert (NULL == task);
     793           0 :     task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
     794             :                                          &select_shard,
     795             :                                          NULL);
     796           0 :     return;
     797         121 :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     798             :     /* continued below */
     799         121 :     break;
     800             :   }
     801         121 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     802             :               "Starting with shard [%llu,%llu)\n",
     803             :               (unsigned long long) start,
     804             :               (unsigned long long) end);
     805         121 :   shard = GNUNET_new (struct Shard);
     806         121 :   shard->shard_start_time = GNUNET_TIME_absolute_get ();
     807         121 :   shard->shard_start = start;
     808         121 :   shard->shard_end = end;
     809         121 :   shard->batch_start = start;
     810         121 :   GNUNET_assert (NULL == task);
     811         121 :   task = GNUNET_SCHEDULER_add_now (&run_transfers,
     812             :                                    NULL);
     813             : }
     814             : 
     815             : 
     816             : /**
     817             :  * First task.
     818             :  *
     819             :  * @param cls closure, NULL
     820             :  * @param args remaining command-line arguments
     821             :  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
     822             :  * @param c configuration
     823             :  */
     824             : static void
     825          60 : run (void *cls,
     826             :      char *const *args,
     827             :      const char *cfgfile,
     828             :      const struct GNUNET_CONFIGURATION_Handle *c)
     829             : {
     830             :   (void) cls;
     831             :   (void) args;
     832             :   (void) cfgfile;
     833             : 
     834          60 :   cfg = c;
     835          60 :   if (GNUNET_OK != parse_transfer_config ())
     836             :   {
     837           0 :     cfg = NULL;
     838           0 :     global_ret = EXIT_NOTCONFIGURED;
     839           0 :     return;
     840             :   }
     841          60 :   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
     842             :                           &rc);
     843          60 :   rc = GNUNET_CURL_gnunet_rc_create (ctx);
     844          60 :   if (NULL == ctx)
     845             :   {
     846           0 :     GNUNET_break (0);
     847           0 :     return;
     848             :   }
     849          60 :   if (GNUNET_SYSERR ==
     850          60 :       db_plugin->preflight (db_plugin->cls))
     851             :   {
     852           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     853             :                 "Failed to obtain database connection!\n");
     854           0 :     global_ret = EXIT_FAILURE;
     855           0 :     GNUNET_SCHEDULER_shutdown ();
     856           0 :     return;
     857             :   }
     858          60 :   GNUNET_assert (NULL == task);
     859          60 :   task = GNUNET_SCHEDULER_add_now (&select_shard,
     860             :                                    NULL);
     861          60 :   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
     862             :                                  cls);
     863             : }
     864             : 
     865             : 
     866             : /**
     867             :  * The main function of the taler-exchange-transfer.
     868             :  *
     869             :  * @param argc number of arguments from the command line
     870             :  * @param argv command line arguments
     871             :  * @return 0 ok, 1 on error
     872             :  */
     873             : int
     874          60 : main (int argc,
     875             :       char *const *argv)
     876             : {
     877          60 :   struct GNUNET_GETOPT_CommandLineOption options[] = {
     878          60 :     GNUNET_GETOPT_option_uint ('S',
     879             :                                "size",
     880             :                                "SIZE",
     881             :                                "Size to process per shard (default: 1024)",
     882             :                                &shard_size),
     883          60 :     GNUNET_GETOPT_option_timetravel ('T',
     884             :                                      "timetravel"),
     885          60 :     GNUNET_GETOPT_option_flag ('t',
     886             :                                "test",
     887             :                                "run in test mode and exit when idle",
     888             :                                &test_mode),
     889          60 :     GNUNET_GETOPT_option_uint ('w',
     890             :                                "workers",
     891             :                                "COUNT",
     892             :                                "Plan work load with up to COUNT worker processes (default: 16)",
     893             :                                &max_workers),
     894          60 :     GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
     895             :     GNUNET_GETOPT_OPTION_END
     896             :   };
     897             :   enum GNUNET_GenericReturnValue ret;
     898             : 
     899          60 :   ret = GNUNET_PROGRAM_run (
     900             :     TALER_EXCHANGE_project_data (),
     901             :     argc, argv,
     902             :     "taler-exchange-transfer",
     903             :     gettext_noop (
     904             :       "background process that executes outgoing wire transfers"),
     905             :     options,
     906             :     &run, NULL);
     907          60 :   if (GNUNET_SYSERR == ret)
     908           0 :     return EXIT_INVALIDARGUMENT;
     909          60 :   if (GNUNET_NO == ret)
     910           0 :     return EXIT_SUCCESS;
     911          60 :   return global_ret;
     912             : }
     913             : 
     914             : 
     915             : /* end of taler-exchange-transfer.c */

Generated by: LCOV version 1.16