LCOV - code coverage report
Current view: top level - exchange - taler-exchange-transfer.c (source / functions) Hit Total Coverage
Test: GNU Taler exchange coverage report Lines: 96 196 49.0 %
Date: 2021-08-30 06:43:37 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   This file is part of TALER
       3             :   Copyright (C) 2016-2020 Taler Systems SA
       4             : 
       5             :   TALER is free software; you can redistribute it and/or modify it under the
       6             :   terms of the GNU Affero General Public License as published by the Free Software
       7             :   Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
      12             : 
      13             :   You should have received a copy of the GNU Affero General Public License along with
      14             :   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             : */
      16             : /**
      17             :  * @file taler-exchange-transfer.c
      18             :  * @brief Process that actually finalizes outgoing transfers with the wire gateway / bank
      19             :  * @author Christian Grothoff
      20             :  */
      21             : #include "platform.h"
      22             : #include <gnunet/gnunet_util_lib.h>
      23             : #include <jansson.h>
      24             : #include <pthread.h>
      25             : #include "taler_exchangedb_lib.h"
      26             : #include "taler_exchangedb_plugin.h"
      27             : #include "taler_json_lib.h"
      28             : #include "taler_bank_service.h"
      29             : 
      30             : 
      31             : /**
      32             :  * Data we keep to #run_transfers().  There is at most
      33             :  * one of these around at any given point in time.
      34             :  * Note that this limits parallelism, and we might want
      35             :  * to revise this decision at a later point.
      36             :  */
      37             : struct WirePrepareData
      38             : {
      39             : 
      40             :   /**
      41             :    * Wire execution handle.
      42             :    */
      43             :   struct TALER_BANK_TransferHandle *eh;
      44             : 
      45             :   /**
      46             :    * Wire account used for this preparation.
      47             :    */
      48             :   const struct TALER_EXCHANGEDB_AccountInfo *wa;
      49             : 
      50             :   /**
      51             :    * Row ID of the transfer.
      52             :    */
      53             :   unsigned long long row_id;
      54             : 
      55             : };
      56             : 
      57             : 
      58             : /**
      59             :  * The exchange's configuration.
      60             :  */
      61             : static const struct GNUNET_CONFIGURATION_Handle *cfg;
      62             : 
      63             : /**
      64             :  * Our database plugin.
      65             :  */
      66             : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
      67             : 
      68             : /**
      69             :  * Next task to run, if any.
      70             :  */
      71             : static struct GNUNET_SCHEDULER_Task *task;
      72             : 
      73             : /**
      74             :  * If we are currently executing a transfer, information about
      75             :  * the active transfer is here. Otherwise, this variable is NULL.
      76             :  */
      77             : static struct WirePrepareData *wpd;
      78             : 
      79             : /**
      80             :  * Handle to the context for interacting with the bank / wire gateway.
      81             :  */
      82             : static struct GNUNET_CURL_Context *ctx;
      83             : 
      84             : /**
      85             :  * Scheduler context for running the @e ctx.
      86             :  */
      87             : static struct GNUNET_CURL_RescheduleContext *rc;
      88             : 
      89             : /**
      90             :  * How long should we sleep when idle before trying to find more work?
      91             :  */
      92             : static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
      93             : 
      94             : /**
      95             :  * Value to return from main(). 0 on success, non-zero on errors.
      96             :  */
      97             : static int global_ret;
      98             : 
      99             : /**
     100             :  * #GNUNET_YES if we are in test mode and should exit when idle.
     101             :  */
     102             : static int test_mode;
     103             : 
     104             : 
     105             : /**
     106             :  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
     107             :  *
     108             :  * @param cls closure
     109             :  */
     110             : static void
     111          49 : shutdown_task (void *cls)
     112             : {
     113             :   (void) cls;
     114          49 :   if (NULL != ctx)
     115             :   {
     116          49 :     GNUNET_CURL_fini (ctx);
     117          49 :     ctx = NULL;
     118             :   }
     119          49 :   if (NULL != rc)
     120             :   {
     121          49 :     GNUNET_CURL_gnunet_rc_destroy (rc);
     122          49 :     rc = NULL;
     123             :   }
     124          49 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     125             :               "Running shutdown\n");
     126          49 :   if (NULL != task)
     127             :   {
     128           0 :     GNUNET_SCHEDULER_cancel (task);
     129           0 :     task = NULL;
     130             :   }
     131          49 :   if (NULL != wpd)
     132             :   {
     133           0 :     if (NULL != wpd->eh)
     134             :     {
     135           0 :       TALER_BANK_transfer_cancel (wpd->eh);
     136           0 :       wpd->eh = NULL;
     137             :     }
     138           0 :     db_plugin->rollback (db_plugin->cls);
     139           0 :     GNUNET_free (wpd);
     140           0 :     wpd = NULL;
     141             :   }
     142          49 :   TALER_EXCHANGEDB_plugin_unload (db_plugin);
     143          49 :   db_plugin = NULL;
     144          49 :   TALER_EXCHANGEDB_unload_accounts ();
     145          49 :   cfg = NULL;
     146          49 : }
     147             : 
     148             : 
     149             : /**
     150             :  * Parse the configuration for wirewatch.
     151             :  *
     152             :  * @return #GNUNET_OK on success
     153             :  */
     154             : static int
     155          49 : parse_wirewatch_config (void)
     156             : {
     157          49 :   if (GNUNET_OK !=
     158          49 :       GNUNET_CONFIGURATION_get_value_time (cfg,
     159             :                                            "exchange",
     160             :                                            "AGGREGATOR_IDLE_SLEEP_INTERVAL",
     161             :                                            &aggregator_idle_sleep_interval))
     162             :   {
     163           0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     164             :                                "exchange",
     165             :                                "AGGREGATOR_IDLE_SLEEP_INTERVAL");
     166           0 :     return GNUNET_SYSERR;
     167             :   }
     168          49 :   if (NULL ==
     169          49 :       (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
     170             :   {
     171           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     172             :                 "Failed to initialize DB subsystem\n");
     173           0 :     return GNUNET_SYSERR;
     174             :   }
     175          49 :   if (GNUNET_OK !=
     176          49 :       TALER_EXCHANGEDB_load_accounts (cfg,
     177             :                                       TALER_EXCHANGEDB_ALO_DEBIT
     178             :                                       | TALER_EXCHANGEDB_ALO_AUTHDATA))
     179             :   {
     180           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     181             :                 "No wire accounts configured for debit!\n");
     182           0 :     TALER_EXCHANGEDB_plugin_unload (db_plugin);
     183           0 :     db_plugin = NULL;
     184           0 :     return GNUNET_SYSERR;
     185             :   }
     186          49 :   return GNUNET_OK;
     187             : }
     188             : 
     189             : 
     190             : /**
     191             :  * Perform a database commit. If it fails, print a warning.
     192             :  *
     193             :  * @return status of commit
     194             :  */
     195             : static enum GNUNET_DB_QueryStatus
     196          49 : commit_or_warn (void)
     197             : {
     198             :   enum GNUNET_DB_QueryStatus qs;
     199             : 
     200          49 :   qs = db_plugin->commit (db_plugin->cls);
     201          49 :   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
     202          49 :     return qs;
     203           0 :   GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
     204             :               ? GNUNET_ERROR_TYPE_INFO
     205             :               : GNUNET_ERROR_TYPE_ERROR,
     206             :               "Failed to commit database transaction!\n");
     207           0 :   return qs;
     208             : }
     209             : 
     210             : 
     211             : /**
     212             :  * Execute the wire transfers that we have committed to
     213             :  * do.
     214             :  *
     215             :  * @param cls NULL
     216             :  */
     217             : static void
     218             : run_transfers (void *cls);
     219             : 
     220             : 
     221             : /**
     222             :  * Function called with the result from the execute step.
     223             :  * On success, we mark the respective wire transfer as finished,
     224             :  * and in general we afterwards continue to #run_transfers(),
     225             :  * except for irrecoverable errors.
     226             :  *
     227             :  * @param cls NULL
     228             :  * @param http_status_code #MHD_HTTP_OK on success
     229             :  * @param ec taler error code
     230             :  * @param row_id unique ID of the wire transfer in the bank's records
     231             :  * @param wire_timestamp when did the transfer happen
     232             :  */
     233             : static void
     234          49 : wire_confirm_cb (void *cls,
     235             :                  unsigned int http_status_code,
     236             :                  enum TALER_ErrorCode ec,
     237             :                  uint64_t row_id,
     238             :                  struct GNUNET_TIME_Absolute wire_timestamp)
     239             : {
     240             :   enum GNUNET_DB_QueryStatus qs;
     241             : 
     242             :   (void) cls;
     243             :   (void) row_id;
     244             :   (void) wire_timestamp;
     245          49 :   wpd->eh = NULL;
     246          49 :   switch (http_status_code)
     247             :   {
     248          49 :   case MHD_HTTP_OK:
     249          49 :     qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
     250          49 :                                                      wpd->row_id);
     251             :     /* continued below */
     252          49 :     break;
     253           0 :   case MHD_HTTP_NOT_FOUND:
     254           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     255             :                 "Wire transaction %llu failed: %u/%d\n",
     256             :                 (unsigned long long) wpd->row_id,
     257             :                 http_status_code,
     258             :                 ec);
     259           0 :     qs = db_plugin->wire_prepare_data_mark_failed (db_plugin->cls,
     260           0 :                                                    wpd->row_id);
     261             :     /* continued below */
     262           0 :     break;
     263           0 :   default:
     264           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     265             :                 "Wire transaction failed: %u/%d\n",
     266             :                 http_status_code,
     267             :                 ec);
     268           0 :     db_plugin->rollback (db_plugin->cls);
     269           0 :     global_ret = EXIT_FAILURE;
     270           0 :     GNUNET_SCHEDULER_shutdown ();
     271           0 :     GNUNET_free (wpd);
     272           0 :     wpd = NULL;
     273           0 :     return;
     274             :   }
     275          49 :   if (0 >= qs)
     276             :   {
     277           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     278           0 :     db_plugin->rollback (db_plugin->cls);
     279           0 :     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
     280             :     {
     281             :       /* try again */
     282           0 :       GNUNET_assert (NULL == task);
     283           0 :       task = GNUNET_SCHEDULER_add_now (&run_transfers,
     284             :                                        NULL);
     285             :     }
     286             :     else
     287             :     {
     288           0 :       global_ret = EXIT_FAILURE;
     289           0 :       GNUNET_SCHEDULER_shutdown ();
     290             :     }
     291           0 :     GNUNET_free (wpd);
     292           0 :     wpd = NULL;
     293           0 :     return;
     294             :   }
     295          49 :   GNUNET_free (wpd);
     296          49 :   wpd = NULL;
     297          49 :   switch (commit_or_warn ())
     298             :   {
     299           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     300             :     /* try again */
     301           0 :     GNUNET_assert (NULL == task);
     302           0 :     task = GNUNET_SCHEDULER_add_now (&run_transfers,
     303             :                                      NULL);
     304           0 :     return;
     305           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     306           0 :     GNUNET_break (0);
     307           0 :     global_ret = EXIT_FAILURE;
     308           0 :     GNUNET_SCHEDULER_shutdown ();
     309           0 :     return;
     310          49 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     311          49 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     312             :                 "Wire transfer complete\n");
     313             :     /* continue with #run_transfers(), just to guard
     314             :        against the unlikely case that there are more. */
     315          49 :     GNUNET_assert (NULL == task);
     316          49 :     task = GNUNET_SCHEDULER_add_now (&run_transfers,
     317             :                                      NULL);
     318          49 :     return;
     319           0 :   default:
     320           0 :     GNUNET_break (0);
     321           0 :     global_ret = EXIT_FAILURE;
     322           0 :     GNUNET_SCHEDULER_shutdown ();
     323           0 :     return;
     324             :   }
     325             : }
     326             : 
     327             : 
     328             : /**
     329             :  * Callback with data about a prepared transaction.  Triggers the respective
     330             :  * wire transfer using the prepared transaction data.
     331             :  *
     332             :  * @param cls NULL
     333             :  * @param rowid row identifier used to mark prepared transaction as done
     334             :  * @param wire_method wire method the preparation was done for
     335             :  * @param buf transaction data that was persisted, NULL on error
     336             :  * @param buf_size number of bytes in @a buf, 0 on error
     337             :  */
     338             : static void
     339          49 : wire_prepare_cb (void *cls,
     340             :                  uint64_t rowid,
     341             :                  const char *wire_method,
     342             :                  const char *buf,
     343             :                  size_t buf_size)
     344             : {
     345             :   const struct TALER_EXCHANGEDB_AccountInfo *wa;
     346             : 
     347             :   (void) cls;
     348          49 :   if ( (NULL == wire_method) ||
     349             :        (NULL == buf) )
     350             :   {
     351           0 :     GNUNET_break (0);
     352           0 :     db_plugin->rollback (db_plugin->cls);
     353           0 :     global_ret = EXIT_FAILURE;
     354           0 :     goto cleanup;
     355             :   }
     356          49 :   wpd->row_id = rowid;
     357          49 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     358             :               "Starting wire transfer %llu\n",
     359             :               (unsigned long long) rowid);
     360          49 :   wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method);
     361          49 :   if (NULL == wpd->wa)
     362             :   {
     363             :     /* Should really never happen here, as when we get
     364             :        here the wire account should be in the cache. */
     365           0 :     GNUNET_break (0);
     366           0 :     db_plugin->rollback (db_plugin->cls);
     367           0 :     global_ret = EXIT_NOTCONFIGURED;
     368           0 :     goto cleanup;
     369             :   }
     370          49 :   wa = wpd->wa;
     371          49 :   wpd->eh = TALER_BANK_transfer (ctx,
     372             :                                  wa->auth,
     373             :                                  buf,
     374             :                                  buf_size,
     375             :                                  &wire_confirm_cb,
     376             :                                  NULL);
     377          49 :   if (NULL == wpd->eh)
     378             :   {
     379           0 :     GNUNET_break (0); /* Irrecoverable */
     380           0 :     db_plugin->rollback (db_plugin->cls);
     381           0 :     global_ret = EXIT_FAILURE;
     382           0 :     goto cleanup;
     383             :   }
     384          49 :   return;
     385           0 : cleanup:
     386           0 :   GNUNET_SCHEDULER_shutdown ();
     387           0 :   GNUNET_free (wpd);
     388           0 :   wpd = NULL;
     389             : }
     390             : 
     391             : 
     392             : /**
     393             :  * Execute the wire transfers that we have committed to
     394             :  * do.
     395             :  *
     396             :  * @param cls NULL
     397             :  */
     398             : static void
     399          98 : run_transfers (void *cls)
     400             : {
     401             :   enum GNUNET_DB_QueryStatus qs;
     402             : 
     403             :   (void) cls;
     404          98 :   task = NULL;
     405          98 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     406             :               "Checking for pending wire transfers\n");
     407          98 :   if (GNUNET_SYSERR ==
     408          98 :       db_plugin->preflight (db_plugin->cls))
     409             :   {
     410           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     411             :                 "Failed to obtain database connection!\n");
     412           0 :     global_ret = EXIT_FAILURE;
     413           0 :     GNUNET_SCHEDULER_shutdown ();
     414           0 :     return;
     415             :   }
     416          98 :   if (GNUNET_OK !=
     417          98 :       db_plugin->start (db_plugin->cls,
     418             :                         "aggregator run transfer"))
     419             :   {
     420           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     421             :                 "Failed to start database transaction!\n");
     422           0 :     global_ret = EXIT_FAILURE;
     423           0 :     GNUNET_SCHEDULER_shutdown ();
     424           0 :     return;
     425             :   }
     426          98 :   wpd = GNUNET_new (struct WirePrepareData);
     427          98 :   qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
     428             :                                          &wire_prepare_cb,
     429             :                                          NULL);
     430          98 :   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
     431          49 :     return;  /* continued via continuation set in #wire_prepare_cb() */
     432          49 :   db_plugin->rollback (db_plugin->cls);
     433          49 :   GNUNET_free (wpd);
     434          49 :   wpd = NULL;
     435          49 :   switch (qs)
     436             :   {
     437           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     438           0 :     GNUNET_break (0);
     439           0 :     global_ret = EXIT_FAILURE;
     440           0 :     GNUNET_SCHEDULER_shutdown ();
     441           0 :     return;
     442           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     443             :     /* try again */
     444           0 :     GNUNET_assert (NULL == task);
     445           0 :     task = GNUNET_SCHEDULER_add_now (&run_transfers,
     446             :                                      NULL);
     447           0 :     return;
     448          49 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     449             :     /* no more prepared wire transfers, go sleep a bit! */
     450          49 :     GNUNET_assert (NULL == task);
     451          49 :     if (GNUNET_YES == test_mode)
     452             :     {
     453          49 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     454             :                   "No more pending wire transfers, shutting down (because we are in test mode)\n");
     455          49 :       GNUNET_SCHEDULER_shutdown ();
     456             :     }
     457             :     else
     458             :     {
     459           0 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     460             :                   "No more pending wire transfers, going idle\n");
     461           0 :       task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
     462             :                                            &run_transfers,
     463             :                                            NULL);
     464             :     }
     465          49 :     return;
     466           0 :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     467             :     /* should be impossible */
     468           0 :     GNUNET_assert (0);
     469             :   }
     470             : }
     471             : 
     472             : 
     473             : /**
     474             :  * First task.
     475             :  *
     476             :  * @param cls closure, NULL
     477             :  * @param args remaining command-line arguments
     478             :  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
     479             :  * @param c configuration
     480             :  */
     481             : static void
     482          49 : run (void *cls,
     483             :      char *const *args,
     484             :      const char *cfgfile,
     485             :      const struct GNUNET_CONFIGURATION_Handle *c)
     486             : {
     487             :   (void) cls;
     488             :   (void) args;
     489             :   (void) cfgfile;
     490             : 
     491          49 :   cfg = c;
     492          49 :   if (GNUNET_OK != parse_wirewatch_config ())
     493             :   {
     494           0 :     cfg = NULL;
     495           0 :     global_ret = EXIT_NOTCONFIGURED;
     496           0 :     return;
     497             :   }
     498          49 :   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
     499             :                           &rc);
     500          49 :   rc = GNUNET_CURL_gnunet_rc_create (ctx);
     501          49 :   if (NULL == ctx)
     502             :   {
     503           0 :     GNUNET_break (0);
     504           0 :     return;
     505             :   }
     506             : 
     507          49 :   GNUNET_assert (NULL == task);
     508          49 :   task = GNUNET_SCHEDULER_add_now (&run_transfers,
     509             :                                    NULL);
     510          49 :   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
     511             :                                  cls);
     512             : }
     513             : 
     514             : 
     515             : /**
     516             :  * The main function of the taler-exchange-transfer.
     517             :  *
     518             :  * @param argc number of arguments from the command line
     519             :  * @param argv command line arguments
     520             :  * @return 0 ok, 1 on error
     521             :  */
     522             : int
     523          49 : main (int argc,
     524             :       char *const *argv)
     525             : {
     526          49 :   struct GNUNET_GETOPT_CommandLineOption options[] = {
     527          49 :     GNUNET_GETOPT_option_timetravel ('T',
     528             :                                      "timetravel"),
     529          49 :     GNUNET_GETOPT_option_flag ('t',
     530             :                                "test",
     531             :                                "run in test mode and exit when idle",
     532             :                                &test_mode),
     533          49 :     GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
     534             :     GNUNET_GETOPT_OPTION_END
     535             :   };
     536             :   enum GNUNET_GenericReturnValue ret;
     537             : 
     538          49 :   if (GNUNET_OK !=
     539          49 :       GNUNET_STRINGS_get_utf8_args (argc, argv,
     540             :                                     &argc, &argv))
     541           0 :     return EXIT_INVALIDARGUMENT;
     542          49 :   TALER_OS_init ();
     543          49 :   ret = GNUNET_PROGRAM_run (
     544             :     argc, argv,
     545             :     "taler-exchange-transfer",
     546             :     gettext_noop (
     547             :       "background process that executes outgoing wire transfers"),
     548             :     options,
     549             :     &run, NULL);
     550          49 :   GNUNET_free_nz ((void *) argv);
     551          49 :   if (GNUNET_SYSERR == ret)
     552           0 :     return EXIT_INVALIDARGUMENT;
     553          49 :   if (GNUNET_NO == ret)
     554           0 :     return EXIT_SUCCESS;
     555          49 :   return global_ret;
     556             : }
     557             : 
     558             : 
     559             : /* end of taler-exchange-transfer.c */

Generated by: LCOV version 1.14