LCOV - code coverage report
Current view: top level - exchange - taler-exchange-wirewatch.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 56.5 % 395 223
Test Date: 2026-03-17 00:32:16 Functions: 91.7 % 12 11

            Line data    Source code
       1              : /*
       2              :   This file is part of TALER
       3              :   Copyright (C) 2016--2023 Taler Systems SA
       4              : 
       5              :   TALER is free software; you can redistribute it and/or modify it under the
       6              :   terms of the GNU Affero General Public License as published by the Free Software
       7              :   Foundation; either version 3, or (at your option) any later version.
       8              : 
       9              :   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10              :   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11              :   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
      12              : 
      13              :   You should have received a copy of the GNU Affero General Public License along with
      14              :   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15              : */
      16              : /**
      17              :  * @file taler-exchange-wirewatch.c
      18              :  * @brief Process that watches for wire transfers to the exchange's bank account
      19              :  * @author Christian Grothoff
      20              :  */
      21              : #include "taler/platform.h"
      22              : #include <gnunet/gnunet_util_lib.h>
      23              : #include <jansson.h>
      24              : #include <pthread.h>
      25              : #include <microhttpd.h>
      26              : #include "taler/taler_exchangedb_lib.h"
      27              : #include "taler/taler_exchangedb_plugin.h"
      28              : #include "taler/taler_json_lib.h"
      29              : #include "taler/taler_bank_service.h"
      30              : 
      31              : 
      32              : /**
      33              :  * How long to wait for an HTTP reply if there
      34              :  * are no transactions pending at the server?
      35              :  */
      36              : #define LONGPOLL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
      37              : 
      38              : /**
      39              :  * What is the maximum batch size we use for credit history
      40              :  * requests with the bank.  See `batch_size` below.
      41              :  */
      42              : #define MAXIMUM_BATCH_SIZE 1024
      43              : 
      44              : /**
      45              :  * Information about our account.
      46              :  */
      47              : static const struct TALER_EXCHANGEDB_AccountInfo *ai;
      48              : 
      49              : /**
      50              :  * Active request for history.
      51              :  */
      52              : static struct TALER_BANK_CreditHistoryHandle *hh;
      53              : 
      54              : /**
      55              :  * Set to true if the request for history did actually
      56              :  * return transaction items.
      57              :  */
      58              : static bool hh_returned_data;
      59              : 
      60              : /**
      61              :  * Set to true if the request for history did not
      62              :  * succeed because the account was unknown.
      63              :  */
      64              : static bool hh_account_404;
      65              : 
      66              : /**
      67              :  * Set to true if the request for history did not
      68              :  * succeed because of some unexpected HTTP request error.
      69              :  */
      70              : static bool hh_error;
      71              : 
      72              : /**
      73              :  * When did we start the last @e hh request?
      74              :  */
      75              : static struct GNUNET_TIME_Absolute hh_start_time;
      76              : 
      77              : /**
      78              :  * Until when is processing this wire plugin delayed?
      79              :  */
      80              : static struct GNUNET_TIME_Absolute delayed_until;
      81              : 
      82              : /**
      83              :  * Encoded offset in the wire transfer list from where
      84              :  * to start the next query with the bank.
      85              :  */
      86              : static uint64_t batch_start;
      87              : 
      88              : /**
      89              :  * Latest row offset seen in this transaction, becomes
      90              :  * the new #batch_start upon commit.
      91              :  */
      92              : static uint64_t latest_row_off;
      93              : 
      94              : /**
      95              :  * Offset where our current shard begins (inclusive).
      96              :  */
      97              : static uint64_t shard_start;
      98              : 
      99              : /**
     100              :  * Offset where our current shard ends (exclusive).
     101              :  */
     102              : static uint64_t shard_end;
     103              : 
     104              : /**
     105              :  * When did we start with the shard?
     106              :  */
     107              : static struct GNUNET_TIME_Absolute shard_start_time;
     108              : 
     109              : /**
     110              :  * For how long did we lock the shard?
     111              :  */
     112              : static struct GNUNET_TIME_Absolute shard_end_time;
     113              : 
     114              : /**
     115              :  * How long did we take to finish the last shard
     116              :  * for this account?
     117              :  */
     118              : static struct GNUNET_TIME_Relative shard_delay;
     119              : 
     120              : /**
     121              :  * How long did we take to finish the last shard
     122              :  * for this account?
     123              :  */
     124              : static struct GNUNET_TIME_Relative longpoll_timeout;
     125              : 
     126              : /**
     127              :  * How long do we wait on 404.
     128              :  */
     129              : static struct GNUNET_TIME_Relative h404_backoff;
     130              : 
     131              : /**
     132              :  * How long do we wait on HTTP history request errors.
     133              :  */
     134              : static struct GNUNET_TIME_Relative hh_error_backoff;
     135              : 
     136              : /**
     137              :  * Name of our job in the shard table.
     138              :  */
     139              : static char *job_name;
     140              : 
     141              : /**
     142              :  * How many transactions do we retrieve per batch?
     143              :  */
     144              : static unsigned int batch_size;
     145              : 
     146              : /**
     147              :  * How much do we increment @e batch_size on success?
     148              :  */
     149              : static unsigned int batch_thresh;
     150              : 
     151              : /**
     152              :  * Did work remain in the transaction queue? Set to true
     153              :  * if we did some work and thus there might be more.
     154              :  */
     155              : static bool progress;
     156              : 
     157              : /**
     158              :  * Did we start a transaction yet?
     159              :  */
     160              : static bool started_transaction;
     161              : 
     162              : /**
     163              :  * Is this shard still open for processing.
     164              :  */
     165              : static bool shard_open;
     166              : 
     167              : /**
     168              :  * Handle to the context for interacting with the bank.
     169              :  */
     170              : static struct GNUNET_CURL_Context *ctx;
     171              : 
     172              : /**
     173              :  * Scheduler context for running the @e ctx.
     174              :  */
     175              : static struct GNUNET_CURL_RescheduleContext *rc;
     176              : 
     177              : /**
     178              :  * The exchange's configuration (global)
     179              :  */
     180              : static const struct GNUNET_CONFIGURATION_Handle *cfg;
     181              : 
     182              : /**
     183              :  * Our DB plugin.
     184              :  */
     185              : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
     186              : 
     187              : /**
     188              :  * How long should we sleep when idle before trying to find more work?
     189              :  * Also used for how long we wait to grab a shard before trying it again.
     190              :  * The value should be set to a bit above the average time it takes to
     191              :  * process a shard.
     192              :  */
     193              : static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
     194              : 
     195              : /**
     196              :  * How long do we sleep on serialization conflicts?
     197              :  */
     198              : static struct GNUNET_TIME_Relative wirewatch_conflict_sleep_interval;
     199              : 
     200              : /**
     201              :  * Modulus to apply to group shards.  The shard size must ultimately be a
     202              :  * multiple of the batch size. Thus, if this is not a multiple of the
     203              :  * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
     204              :  */
     205              : static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
     206              : 
     207              : /**
     208              :  * How many workers should we plan our scheduling with?
     209              :  */
     210              : static unsigned int max_workers = 16;
     211              : 
     212              : /**
     213              :  * -e command-line option: exit on errors talking to the bank?
     214              :  */
     215              : static int exit_on_error;
     216              : 
     217              : /**
     218              :  * Value to return from main(). 0 on success, non-zero on
     219              :  * on serious errors.
     220              :  */
     221              : static int global_ret;
     222              : 
     223              : /**
     224              :  * Are we run in testing mode and should only do one pass?
     225              :  */
     226              : static int test_mode;
     227              : 
     228              : /**
     229              :  * Should we ignore if the bank does not know our bank
     230              :  * account?
     231              :  */
     232              : static int ignore_account_404;
     233              : 
     234              : /**
     235              :  * Current task waiting for execution, if any.
     236              :  */
     237              : static struct GNUNET_SCHEDULER_Task *task;
     238              : 
     239              : /**
     240              :  * Name of the configuration section with the account we should watch.
     241              :  */
     242              : static char *account_section;
     243              : 
     244              : /**
     245              :  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
     246              :  *
     247              :  * @param cls closure
     248              :  */
     249              : static void
     250           62 : shutdown_task (void *cls)
     251              : {
     252              :   enum GNUNET_DB_QueryStatus qs;
     253              :   (void) cls;
     254              : 
     255           62 :   if (NULL != hh)
     256              :   {
     257            0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     258              :                 "History request cancelled on shutdown\n");
     259            0 :     TALER_BANK_credit_history_cancel (hh);
     260            0 :     hh = NULL;
     261              :   }
     262           62 :   if (started_transaction)
     263              :   {
     264            0 :     db_plugin->rollback (db_plugin->cls);
     265            0 :     started_transaction = false;
     266              :   }
     267           62 :   if (shard_open)
     268              :   {
     269           62 :     qs = db_plugin->abort_shard (db_plugin->cls,
     270              :                                  job_name,
     271              :                                  shard_start,
     272              :                                  shard_end);
     273           62 :     if (qs <= 0)
     274            0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     275              :                   "Failed to abort work shard on shutdown\n");
     276              :   }
     277           62 :   GNUNET_free (job_name);
     278           62 :   if (NULL != ctx)
     279              :   {
     280           62 :     GNUNET_CURL_fini (ctx);
     281           62 :     ctx = NULL;
     282              :   }
     283           62 :   if (NULL != rc)
     284              :   {
     285           62 :     GNUNET_CURL_gnunet_rc_destroy (rc);
     286           62 :     rc = NULL;
     287              :   }
     288           62 :   if (NULL != task)
     289              :   {
     290            0 :     GNUNET_SCHEDULER_cancel (task);
     291            0 :     task = NULL;
     292              :   }
     293           62 :   TALER_EXCHANGEDB_plugin_unload (db_plugin);
     294           62 :   db_plugin = NULL;
     295           62 :   TALER_EXCHANGEDB_unload_accounts ();
     296           62 :   cfg = NULL;
     297           62 : }
     298              : 
     299              : 
     300              : /**
     301              :  * Function called with information about a wire account.  Adds the
     302              :  * account to our list (if it is enabled and we can load the plugin).
     303              :  *
     304              :  * @param cls closure, NULL
     305              :  * @param in_ai account information
     306              :  */
     307              : static void
     308          172 : add_account_cb (void *cls,
     309              :                 const struct TALER_EXCHANGEDB_AccountInfo *in_ai)
     310              : {
     311              :   (void) cls;
     312          172 :   if (! in_ai->credit_enabled)
     313            0 :     return; /* not enabled for us, skip */
     314          172 :   if ( (NULL != account_section) &&
     315          170 :        (0 != strcasecmp (in_ai->section_name,
     316              :                          account_section)) )
     317          110 :     return; /* not enabled for us, skip */
     318           62 :   if (NULL != ai)
     319              :   {
     320            0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     321              :                 "Multiple accounts enabled (%s and %s), use '-a' command-line option to select one!\n",
     322              :                 ai->section_name,
     323              :                 in_ai->section_name);
     324            0 :     GNUNET_SCHEDULER_shutdown ();
     325            0 :     global_ret = EXIT_INVALIDARGUMENT;
     326            0 :     return;
     327              :   }
     328           62 :   ai = in_ai;
     329           62 :   GNUNET_asprintf (&job_name,
     330              :                    "wirewatch-%s",
     331           62 :                    ai->section_name);
     332           62 :   batch_size = MAXIMUM_BATCH_SIZE;
     333           62 :   if (0 != shard_size % batch_size)
     334           62 :     batch_size = shard_size;
     335              : }
     336              : 
     337              : 
     338              : /**
     339              :  * Parse configuration parameters for the exchange server into the
     340              :  * corresponding global variables.
     341              :  *
     342              :  * @return #GNUNET_OK on success
     343              :  */
     344              : static enum GNUNET_GenericReturnValue
     345           62 : exchange_serve_process_config (void)
     346              : {
     347           62 :   if (GNUNET_OK !=
     348           62 :       GNUNET_CONFIGURATION_get_value_time (cfg,
     349              :                                            "exchange",
     350              :                                            "WIREWATCH_IDLE_SLEEP_INTERVAL",
     351              :                                            &wirewatch_idle_sleep_interval))
     352              :   {
     353            0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     354              :                                "exchange",
     355              :                                "WIREWATCH_IDLE_SLEEP_INTERVAL");
     356            0 :     return GNUNET_SYSERR;
     357              :   }
     358           62 :   if (NULL ==
     359           62 :       (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg,
     360              :                                                  false)))
     361              :   {
     362            0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     363              :                 "Failed to initialize DB subsystem\n");
     364            0 :     return GNUNET_SYSERR;
     365              :   }
     366           62 :   if (GNUNET_OK !=
     367           62 :       TALER_EXCHANGEDB_load_accounts (cfg,
     368              :                                       TALER_EXCHANGEDB_ALO_CREDIT
     369              :                                       | TALER_EXCHANGEDB_ALO_AUTHDATA))
     370              :   {
     371            0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     372              :                 "No wire accounts configured for credit!\n");
     373            0 :     return GNUNET_SYSERR;
     374              :   }
     375           62 :   TALER_EXCHANGEDB_find_accounts (&add_account_cb,
     376              :                                   NULL);
     377           62 :   if (NULL == ai)
     378              :   {
     379            0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     380              :                 "No accounts enabled for credit!\n");
     381            0 :     GNUNET_SCHEDULER_shutdown ();
     382            0 :     return GNUNET_SYSERR;
     383              :   }
     384           62 :   return GNUNET_OK;
     385              : }
     386              : 
     387              : 
     388              : /**
     389              :  * Lock a shard and then begin to query for incoming wire transfers.
     390              :  *
     391              :  * @param cls NULL
     392              :  */
     393              : static void
     394              : lock_shard (void *cls);
     395              : 
     396              : 
     397              : /**
     398              :  * Continue with the credit history of the shard.
     399              :  *
     400              :  * @param cls NULL
     401              :  */
     402              : static void
     403              : continue_with_shard (void *cls);
     404              : 
     405              : 
     406              : /**
     407              :  * We encountered a serialization error.  Rollback the transaction and try
     408              :  * again.
     409              :  */
     410              : static void
     411            0 : handle_soft_error (void)
     412              : {
     413            0 :   db_plugin->rollback (db_plugin->cls);
     414            0 :   started_transaction = false;
     415            0 :   if (1 < batch_size)
     416              :   {
     417            0 :     batch_thresh = batch_size;
     418            0 :     batch_size /= 2;
     419            0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     420              :                 "Reduced batch size to %llu due to serialization issue\n",
     421              :                 (unsigned long long) batch_size);
     422              :   }
     423              :   /* Reset to beginning of transaction, and go again
     424              :      from there. */
     425            0 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     426              :               "Encountered soft error, resetting start point to batch start\n");
     427            0 :   latest_row_off = batch_start;
     428            0 :   GNUNET_assert (NULL == task);
     429            0 :   task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
     430              :                                    NULL);
     431            0 : }
     432              : 
     433              : 
     434              : /**
     435              :  * Schedule the #lock_shard() operation.
     436              :  */
     437              : static void
     438          167 : schedule_transfers (void)
     439              : {
     440          167 :   if (shard_open)
     441            0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     442              :                 "Will retry my shard (%llu,%llu] of %s in %s\n",
     443              :                 (unsigned long long) shard_start,
     444              :                 (unsigned long long) shard_end,
     445              :                 job_name,
     446              :                 GNUNET_STRINGS_relative_time_to_string (
     447              :                   GNUNET_TIME_absolute_get_remaining (delayed_until),
     448              :                   true));
     449              :   else
     450          167 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     451              :                 "Will try to lock next shard of %s in %s\n",
     452              :                 job_name,
     453              :                 GNUNET_STRINGS_relative_time_to_string (
     454              :                   GNUNET_TIME_absolute_get_remaining (delayed_until),
     455              :                   true));
     456          167 :   GNUNET_assert (NULL == task);
     457          167 :   task = GNUNET_SCHEDULER_add_at (delayed_until,
     458              :                                   &lock_shard,
     459              :                                   NULL);
     460          167 : }
     461              : 
     462              : 
     463              : /**
     464              :  * We are done with the work that is possible right now (and the transaction
     465              :  * was committed, if there was one to commit). Move on to the next shard.
     466              :  */
     467              : static void
     468          167 : transaction_completed (void)
     469              : {
     470          167 :   if ( (batch_start + batch_size ==
     471           61 :         latest_row_off) &&
     472           61 :        (batch_size < MAXIMUM_BATCH_SIZE) )
     473              :   {
     474              :     /* The current batch size worked without serialization
     475              :        issues, and we are allowed to grow. Do so slowly. */
     476              :     int delta;
     477              : 
     478           61 :     delta = ((int) batch_thresh - (int) batch_size) / 4;
     479           61 :     if (delta < 0)
     480            0 :       delta = -delta;
     481           61 :     batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
     482              :                              batch_size + delta + 1);
     483           61 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     484              :                 "Increasing batch size to %llu\n",
     485              :                 (unsigned long long) batch_size);
     486              :   }
     487              : 
     488          167 :   if ( (! progress) && test_mode)
     489              :   {
     490              :     /* Transaction list was drained and we are in
     491              :        test mode. So we are done. */
     492           62 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     493              :                 "Transaction list drained and in test mode. Exiting\n");
     494           62 :     GNUNET_SCHEDULER_shutdown ();
     495           62 :     return;
     496              :   }
     497          105 :   if (! (hh_returned_data || hh_account_404 || hh_error) )
     498              :   {
     499              :     /* Enforce long-polling delay even if the server ignored it
     500              :        and returned earlier */
     501              :     struct GNUNET_TIME_Relative latency;
     502              :     struct GNUNET_TIME_Relative left;
     503              : 
     504            0 :     latency = GNUNET_TIME_absolute_get_duration (hh_start_time);
     505            0 :     left = GNUNET_TIME_relative_subtract (longpoll_timeout,
     506              :                                           latency);
     507            0 :     if (! (test_mode ||
     508            0 :            GNUNET_TIME_relative_is_zero (left)) )
     509            0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     510              :                   "Server did not respect long-polling, enforcing client-side by sleeping for %s\n",
     511              :                   GNUNET_TIME_relative2s (left,
     512              :                                           true));
     513            0 :     delayed_until = GNUNET_TIME_relative_to_absolute (left);
     514              :   }
     515          105 :   if (hh_account_404)
     516              :   {
     517            0 :     h404_backoff = GNUNET_TIME_STD_BACKOFF (h404_backoff);
     518            0 :     delayed_until = GNUNET_TIME_relative_to_absolute (
     519              :       h404_backoff);
     520              :   }
     521              :   else
     522              :   {
     523          105 :     h404_backoff = GNUNET_TIME_UNIT_ZERO;
     524              :   }
     525          105 :   if (hh_error)
     526              :   {
     527            0 :     hh_error_backoff = GNUNET_TIME_STD_BACKOFF (hh_error_backoff);
     528            0 :     delayed_until = GNUNET_TIME_relative_to_absolute (
     529              :       hh_error_backoff);
     530              :   }
     531              :   else
     532              :   {
     533          105 :     hh_error_backoff = GNUNET_TIME_UNIT_ZERO;
     534              :   }
     535          105 :   if (test_mode)
     536          105 :     delayed_until = GNUNET_TIME_UNIT_ZERO_ABS;
     537          105 :   GNUNET_assert (NULL == task);
     538          105 :   schedule_transfers ();
     539              : }
     540              : 
     541              : 
     542              : /**
     543              :  * We got incoming transaction details from the bank. Add them
     544              :  * to the database.
     545              :  *
     546              :  * @param details array of transaction details
     547              :  * @param details_length length of the @a details array
     548              :  */
     549              : static void
     550          105 : process_reply (const struct TALER_BANK_CreditDetails *details,
     551              :                unsigned int details_length)
     552              : {
     553              :   enum GNUNET_DB_QueryStatus qs;
     554              :   bool shard_done;
     555          105 :   uint64_t lroff = latest_row_off;
     556              : 
     557          105 :   if (0 == details_length)
     558              :   {
     559              :     /* Server should have used 204, not 200! */
     560            0 :     GNUNET_break_op (0);
     561            0 :     transaction_completed ();
     562            0 :     return;
     563              :   }
     564          105 :   hh_returned_data = true;
     565              :   /* check serial IDs for range constraints */
     566          175 :   for (unsigned int i = 0; i<details_length; i++)
     567              :   {
     568          105 :     const struct TALER_BANK_CreditDetails *cd = &details[i];
     569              : 
     570          105 :     if (cd->serial_id < lroff)
     571              :     {
     572            0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     573              :                   "Serial ID %llu not monotonic (got %llu before). Failing!\n",
     574              :                   (unsigned long long) cd->serial_id,
     575              :                   (unsigned long long) lroff);
     576            0 :       db_plugin->rollback (db_plugin->cls);
     577            0 :       GNUNET_SCHEDULER_shutdown ();
     578            0 :       return;
     579              :     }
     580          105 :     if (cd->serial_id > shard_end)
     581              :     {
     582              :       /* we are *past* the current shard (likely because the serial_id of the
     583              :          shard_end happens to not exist in the DB). So commit and stop this
     584              :          iteration! */
     585           35 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     586              :                   "Serial ID %llu past shard end at %llu, ending iteration early!\n",
     587              :                   (unsigned long long) cd->serial_id,
     588              :                   (unsigned long long) shard_end);
     589           35 :       details_length = i;
     590           35 :       progress = true;
     591           35 :       lroff = cd->serial_id - 1;
     592           35 :       break;
     593              :     }
     594           70 :     lroff = cd->serial_id;
     595              :   }
     596          105 :   if (0 != details_length)
     597           70 :   {
     598           70 :     enum GNUNET_DB_QueryStatus qss[details_length];
     599           70 :     struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length];
     600           70 :     unsigned int j = 0;
     601              : 
     602              :     /* make compiler happy */
     603           70 :     memset (qss,
     604              :             0,
     605              :             sizeof (qss));
     606           70 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     607              :                 "Importing %u transactions\n",
     608              :                 details_length);
     609          140 :     for (unsigned int i = 0; i<details_length; i++)
     610              :     {
     611           70 :       const struct TALER_BANK_CreditDetails *cd = &details[i];
     612              : 
     613           70 :       switch (cd->type)
     614              :       {
     615           54 :       case TALER_BANK_CT_RESERVE:
     616              :         {
     617           54 :           struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[j++];
     618              : 
     619              :           /* add to batch, do later */
     620           54 :           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     621              :                       "Importing reserve transfer over %s\n",
     622              :                       TALER_amount2s (&cd->amount));
     623           54 :           res->reserve_pub = &cd->details.reserve.reserve_pub;
     624           54 :           res->balance = &cd->amount;
     625           54 :           res->execution_time = cd->execution_date;
     626           54 :           res->sender_account_details = cd->debit_account_uri;
     627           54 :           res->exchange_account_name = ai->section_name;
     628           54 :           res->wire_reference = cd->serial_id;
     629              :         }
     630           54 :         break;
     631           16 :       case TALER_BANK_CT_KYCAUTH:
     632              :         {
     633           16 :           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     634              :                       "Importing KYC auth transfer over %s\n",
     635              :                       TALER_amount2s (&cd->amount));
     636           16 :           qs = db_plugin->kycauth_in_insert (
     637           16 :             db_plugin->cls,
     638              :             &cd->details.kycauth.account_pub,
     639              :             &cd->amount,
     640              :             cd->execution_date,
     641              :             cd->debit_account_uri,
     642           16 :             ai->section_name,
     643           16 :             cd->serial_id);
     644           16 :           switch (qs)
     645              :           {
     646            0 :           case GNUNET_DB_STATUS_HARD_ERROR:
     647            0 :             GNUNET_break (0);
     648            0 :             GNUNET_SCHEDULER_shutdown ();
     649            0 :             return;
     650            0 :           case GNUNET_DB_STATUS_SOFT_ERROR:
     651            0 :             GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     652              :                         "Got DB soft error for kycauth_in_insert (%u). Rolling back.\n",
     653              :                         i);
     654            0 :             handle_soft_error ();
     655            0 :             return;
     656           16 :           default:
     657           16 :             break;
     658              :           }
     659           16 :           break;
     660              :         }
     661            0 :       case TALER_BANK_CT_WAD:
     662              :         {
     663            0 :           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     664              :                       "Importing WAD transfer over %s\n",
     665              :                       TALER_amount2s (&cd->amount));
     666            0 :           qs = db_plugin->wad_in_insert (
     667            0 :             db_plugin->cls,
     668              :             &cd->details.wad.wad_id,
     669            0 :             cd->details.wad.origin_exchange_url,
     670              :             &cd->amount,
     671              :             cd->execution_date,
     672              :             cd->debit_account_uri,
     673            0 :             ai->section_name,
     674            0 :             cd->serial_id);
     675            0 :           switch (qs)
     676              :           {
     677            0 :           case GNUNET_DB_STATUS_HARD_ERROR:
     678            0 :             GNUNET_break (0);
     679            0 :             GNUNET_SCHEDULER_shutdown ();
     680            0 :             return;
     681            0 :           case GNUNET_DB_STATUS_SOFT_ERROR:
     682            0 :             GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     683              :                         "Got DB soft error for wad_in_insert (%u). Rolling back.\n",
     684              :                         i);
     685            0 :             handle_soft_error ();
     686            0 :             return;
     687            0 :           default:
     688            0 :             break;
     689              :           }
     690              : 
     691              :         }
     692              :       }
     693              :     }
     694           70 :     if (j > 0)
     695              :     {
     696           54 :       qs = db_plugin->reserves_in_insert (db_plugin->cls,
     697              :                                           reserves,
     698              :                                           j,
     699              :                                           qss);
     700           54 :       switch (qs)
     701              :       {
     702            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     703            0 :         GNUNET_break (0);
     704            0 :         GNUNET_SCHEDULER_shutdown ();
     705            0 :         return;
     706            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     707            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     708              :                     "Got DB soft error for reserves_in_insert (%u). Rolling back.\n",
     709              :                     details_length);
     710            0 :         handle_soft_error ();
     711            0 :         return;
     712           54 :       default:
     713           54 :         break;
     714              :       }
     715              :     }
     716           70 :     j = 0;
     717          140 :     for (unsigned int i = 0; i<details_length; i++)
     718              :     {
     719           70 :       const struct TALER_BANK_CreditDetails *cd = &details[i];
     720              : 
     721           70 :       if (TALER_BANK_CT_RESERVE != cd->type)
     722           16 :         continue;
     723           54 :       switch (qss[j++])
     724              :       {
     725            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     726            0 :         GNUNET_break (0);
     727            0 :         GNUNET_SCHEDULER_shutdown ();
     728            0 :         return;
     729            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     730            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     731              :                     "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n",
     732              :                     i);
     733            0 :         handle_soft_error ();
     734            0 :         return;
     735            0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     736              :         /* Either wirewatch was freshly started after the system was
     737              :            shutdown and we're going over an incomplete shard again
     738              :            after being restarted, or the shard lock period was too
     739              :            short (number of workers set incorrectly?) and a 2nd
     740              :            wirewatcher has been stealing our work while we are still
     741              :            at it. */
     742            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     743              :                     "Attempted to import transaction %llu (%s) twice. "
     744              :                     "This should happen rarely (if not, ask for support).\n",
     745              :                     (unsigned long long) cd->serial_id,
     746              :                     job_name);
     747            0 :         break;
     748           54 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     749           54 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     750              :                     "Imported transaction %llu.\n",
     751              :                     (unsigned long long) cd->serial_id);
     752              :         /* normal case */
     753           54 :         progress = true;
     754           54 :         break;
     755              :       }
     756              :     }
     757              :   }
     758              : 
     759          105 :   latest_row_off = lroff;
     760          105 :   shard_done = (shard_end <= latest_row_off);
     761          105 :   if (shard_done)
     762              :   {
     763              :     /* shard is complete, mark this as well */
     764          105 :     qs = db_plugin->complete_shard (db_plugin->cls,
     765              :                                     job_name,
     766              :                                     shard_start,
     767              :                                     shard_end);
     768          105 :     switch (qs)
     769              :     {
     770            0 :     case GNUNET_DB_STATUS_HARD_ERROR:
     771            0 :       GNUNET_break (0);
     772            0 :       GNUNET_SCHEDULER_shutdown ();
     773            0 :       return;
     774            0 :     case GNUNET_DB_STATUS_SOFT_ERROR:
     775            0 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     776              :                   "Got DB soft error for complete_shard. Rolling back.\n");
     777            0 :       handle_soft_error ();
     778            0 :       return;
     779            0 :     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     780            0 :       GNUNET_break (0);
     781              :       /* Not expected, but let's just continue */
     782            0 :       break;
     783          105 :     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     784              :       /* normal case */
     785          105 :       progress = true;
     786          105 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     787              :                   "Completed shard %s (%llu,%llu] after %s\n",
     788              :                   job_name,
     789              :                   (unsigned long long) shard_start,
     790              :                   (unsigned long long) shard_end,
     791              :                   GNUNET_STRINGS_relative_time_to_string (
     792              :                     GNUNET_TIME_absolute_get_duration (shard_start_time),
     793              :                     true));
     794          105 :       break;
     795              :     }
     796          105 :     shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
     797          105 :     shard_open = false;
     798          105 :     transaction_completed ();
     799          105 :     return;
     800              :   }
     801            0 :   GNUNET_assert (NULL == task);
     802            0 :   task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
     803              :                                    NULL);
     804              : }
     805              : 
     806              : 
     807              : /**
     808              :  * Callbacks of this type are used to serve the result of asking
     809              :  * the bank for the transaction history.
     810              :  *
     811              :  * @param cls NULL
     812              :  * @param reply response we got from the bank
     813              :  */
     814              : static void
     815          167 : history_cb (void *cls,
     816              :             const struct TALER_BANK_CreditHistoryResponse *reply)
     817              : {
     818              :   (void) cls;
     819          167 :   GNUNET_assert (NULL == task);
     820          167 :   hh = NULL;
     821          167 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     822              :               "History request returned with HTTP status %u\n",
     823              :               reply->http_status);
     824          167 :   switch (reply->http_status)
     825              :   {
     826          105 :   case MHD_HTTP_OK:
     827          105 :     process_reply (reply->details.ok.details,
     828          105 :                    reply->details.ok.details_length);
     829          105 :     return;
     830           61 :   case MHD_HTTP_NO_CONTENT:
     831           61 :     transaction_completed ();
     832           61 :     return;
     833            1 :   case MHD_HTTP_NOT_FOUND:
     834            1 :     hh_account_404 = true;
     835            1 :     if (ignore_account_404)
     836              :     {
     837            0 :       transaction_completed ();
     838            0 :       return;
     839              :     }
     840            1 :     break;
     841            0 :   default:
     842            0 :     hh_error = true;
     843            0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     844              :                 "Error fetching history: %s (%u)\n",
     845              :                 TALER_ErrorCode_get_hint (reply->ec),
     846              :                 reply->http_status);
     847            0 :     break;
     848              :   }
     849            1 :   if (! exit_on_error)
     850              :   {
     851            1 :     transaction_completed ();
     852            1 :     return;
     853              :   }
     854            0 :   GNUNET_SCHEDULER_shutdown ();
     855              : }
     856              : 
     857              : 
     858              : static void
     859          167 : continue_with_shard (void *cls)
     860              : {
     861              :   unsigned int limit;
     862              : 
     863              :   (void) cls;
     864          167 :   task = NULL;
     865          167 :   GNUNET_assert (shard_end > latest_row_off);
     866          167 :   limit = GNUNET_MIN (batch_size,
     867              :                       shard_end - latest_row_off);
     868          167 :   GNUNET_assert (NULL == hh);
     869          167 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     870              :               "Requesting credit history starting from %llu\n",
     871              :               (unsigned long long) latest_row_off);
     872          167 :   hh_start_time = GNUNET_TIME_absolute_get ();
     873          167 :   hh_returned_data = false;
     874          167 :   hh_account_404 = false;
     875          167 :   hh_error = false;
     876          167 :   hh = TALER_BANK_credit_history (ctx,
     877          167 :                                   ai->auth,
     878              :                                   latest_row_off,
     879              :                                   limit,
     880              :                                   test_mode
     881          167 :                                   ? GNUNET_TIME_UNIT_ZERO
     882              :                                   : longpoll_timeout,
     883              :                                   &history_cb,
     884              :                                   NULL);
     885          167 :   if (NULL == hh)
     886              :   {
     887            0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     888              :                 "Failed to start request for account history!\n");
     889            0 :     global_ret = EXIT_FAILURE;
     890            0 :     GNUNET_SCHEDULER_shutdown ();
     891            0 :     return;
     892              :   }
     893              : }
     894              : 
     895              : 
     896              : /**
     897              :  * Reserve a shard for us to work on.
     898              :  *
     899              :  * @param cls NULL
     900              :  */
     901              : static void
     902          167 : lock_shard (void *cls)
     903              : {
     904              :   enum GNUNET_DB_QueryStatus qs;
     905              :   struct GNUNET_TIME_Relative delay;
     906          167 :   uint64_t last_shard_start = shard_start;
     907          167 :   uint64_t last_shard_end = shard_end;
     908              : 
     909              :   (void) cls;
     910          167 :   task = NULL;
     911          167 :   if (GNUNET_SYSERR ==
     912          167 :       db_plugin->preflight (db_plugin->cls))
     913              :   {
     914            0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     915              :                 "Failed to obtain database connection!\n");
     916            0 :     global_ret = EXIT_FAILURE;
     917            0 :     GNUNET_SCHEDULER_shutdown ();
     918            0 :     return;
     919              :   }
     920          167 :   if ( (shard_open) &&
     921            0 :        (GNUNET_TIME_absolute_is_future (shard_end_time)) )
     922              :   {
     923            0 :     progress = false;
     924            0 :     batch_start = latest_row_off;
     925            0 :     task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
     926              :                                      NULL);
     927            0 :     return;
     928              :   }
     929          167 :   if (shard_open)
     930            0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     931              :                 "Shard not completed in time, will try to re-acquire\n");
     932              :   /* How long we lock a shard depends on the number of
     933              :      workers expected, and how long we usually took to
     934              :      process a shard. */
     935          167 :   if (0 == max_workers)
     936          167 :     delay = GNUNET_TIME_UNIT_ZERO;
     937              :   else
     938            0 :     delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
     939              :       GNUNET_CRYPTO_QUALITY_WEAK,
     940            0 :       4 * GNUNET_TIME_relative_max (
     941              :         wirewatch_idle_sleep_interval,
     942              :         GNUNET_TIME_relative_multiply (shard_delay,
     943            0 :                                        max_workers)).rel_value_us);
     944          167 :   shard_start_time = GNUNET_TIME_absolute_get ();
     945          167 :   qs = db_plugin->begin_shard (db_plugin->cls,
     946              :                                job_name,
     947              :                                delay,
     948              :                                shard_size,
     949              :                                &shard_start,
     950              :                                &shard_end);
     951          167 :   switch (qs)
     952              :   {
     953            0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     954            0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     955              :                 "Failed to obtain starting point for monitoring from database!\n");
     956            0 :     global_ret = EXIT_FAILURE;
     957            0 :     GNUNET_SCHEDULER_shutdown ();
     958            0 :     return;
     959            0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     960              :     /* try again */
     961              :     {
     962              :       struct GNUNET_TIME_Relative rdelay;
     963              : 
     964              :       wirewatch_conflict_sleep_interval
     965            0 :         = GNUNET_TIME_STD_BACKOFF (wirewatch_conflict_sleep_interval);
     966            0 :       rdelay = GNUNET_TIME_randomize (wirewatch_conflict_sleep_interval);
     967            0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     968              :                   "Serialization error tying to obtain shard %s, will try again in %s!\n",
     969              :                   job_name,
     970              :                   GNUNET_STRINGS_relative_time_to_string (rdelay,
     971              :                                                           true));
     972              : #if 1
     973            0 :       if (GNUNET_TIME_relative_cmp (rdelay,
     974              :                                     >,
     975              :                                     GNUNET_TIME_UNIT_SECONDS))
     976            0 :         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     977              :                     "Delay would have been for %s\n",
     978              :                     GNUNET_TIME_relative2s (rdelay,
     979              :                                             true));
     980            0 :       rdelay = GNUNET_TIME_relative_min (rdelay,
     981              :                                          GNUNET_TIME_UNIT_SECONDS);
     982              : #endif
     983            0 :       delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
     984              :     }
     985            0 :     GNUNET_assert (NULL == task);
     986            0 :     schedule_transfers ();
     987            0 :     return;
     988            0 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     989            0 :     GNUNET_break (0);
     990            0 :     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     991              :                 "No shard available, will try again for %s in %s!\n",
     992              :                 job_name,
     993              :                 GNUNET_STRINGS_relative_time_to_string (
     994              :                   wirewatch_idle_sleep_interval,
     995              :                   true));
     996            0 :     delayed_until = GNUNET_TIME_relative_to_absolute (
     997              :       wirewatch_idle_sleep_interval);
     998            0 :     shard_open = false;
     999            0 :     GNUNET_assert (NULL == task);
    1000            0 :     schedule_transfers ();
    1001            0 :     return;
    1002          167 :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    1003              :     /* continued below */
    1004          167 :     wirewatch_conflict_sleep_interval = GNUNET_TIME_UNIT_ZERO;
    1005          167 :     break;
    1006              :   }
    1007          167 :   shard_end_time = GNUNET_TIME_relative_to_absolute (delay);
    1008          167 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1009              :               "Starting with shard %s at (%llu,%llu] locked for %s\n",
    1010              :               job_name,
    1011              :               (unsigned long long) shard_start,
    1012              :               (unsigned long long) shard_end,
    1013              :               GNUNET_STRINGS_relative_time_to_string (delay,
    1014              :                                                       true));
    1015          167 :   progress = false;
    1016          167 :   batch_start = shard_start;
    1017          167 :   if ( (shard_open) &&
    1018            0 :        (shard_start == last_shard_start) &&
    1019            0 :        (shard_end == last_shard_end) )
    1020              :   {
    1021            0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1022              :                 "Continuing from %llu\n",
    1023              :                 (unsigned long long) latest_row_off);
    1024            0 :     GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */
    1025              :   }
    1026              :   else
    1027              :   {
    1028          167 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1029              :                 "Resetting shard start to original start point (%d)\n",
    1030              :                 shard_open ? 1 : 0);
    1031          167 :     latest_row_off = batch_start;
    1032              :   }
    1033          167 :   shard_open = true;
    1034          167 :   task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
    1035              :                                    NULL);
    1036              : }
    1037              : 
    1038              : 
    1039              : /**
    1040              :  * First task.
    1041              :  *
    1042              :  * @param cls closure, NULL
    1043              :  * @param args remaining command-line arguments
    1044              :  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
    1045              :  * @param c configuration
    1046              :  */
    1047              : static void
    1048           62 : run (void *cls,
    1049              :      char *const *args,
    1050              :      const char *cfgfile,
    1051              :      const struct GNUNET_CONFIGURATION_Handle *c)
    1052              : {
    1053              :   (void) cls;
    1054              :   (void) args;
    1055              :   (void) cfgfile;
    1056              : 
    1057           62 :   cfg = c;
    1058           62 :   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
    1059              :                                  cls);
    1060           62 :   if (GNUNET_OK !=
    1061           62 :       exchange_serve_process_config ())
    1062              :   {
    1063            0 :     global_ret = EXIT_NOTCONFIGURED;
    1064            0 :     GNUNET_SCHEDULER_shutdown ();
    1065            0 :     return;
    1066              :   }
    1067           62 :   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
    1068              :                           &rc);
    1069           62 :   if (NULL == ctx)
    1070              :   {
    1071            0 :     GNUNET_break (0);
    1072            0 :     GNUNET_SCHEDULER_shutdown ();
    1073            0 :     global_ret = EXIT_NO_RESTART;
    1074            0 :     return;
    1075              :   }
    1076           62 :   rc = GNUNET_CURL_gnunet_rc_create (ctx);
    1077           62 :   schedule_transfers ();
    1078              : }
    1079              : 
    1080              : 
    1081              : /**
    1082              :  * The main function of taler-exchange-wirewatch
    1083              :  *
    1084              :  * @param argc number of arguments from the command line
    1085              :  * @param argv command line arguments
    1086              :  * @return 0 ok, non-zero on error
    1087              :  */
    1088              : int
    1089           62 : main (int argc,
    1090              :       char *const *argv)
    1091              : {
    1092           62 :   struct GNUNET_GETOPT_CommandLineOption options[] = {
    1093           62 :     GNUNET_GETOPT_option_string ('a',
    1094              :                                  "account",
    1095              :                                  "SECTION_NAME",
    1096              :                                  "name of the configuration section with the account we should watch (needed if more than one is enabled for crediting)",
    1097              :                                  &account_section),
    1098           62 :     GNUNET_GETOPT_option_flag ('e',
    1099              :                                "exit-on-error",
    1100              :                                "terminate wirewatch if we failed to download information from the bank",
    1101              :                                &exit_on_error),
    1102           62 :     GNUNET_GETOPT_option_relative_time ('f',
    1103              :                                         "longpoll-timeout",
    1104              :                                         "DELAY",
    1105              :                                         "what is the timeout when asking the bank about new transactions, specify with unit (e.g. --longpoll-timeout=30s)",
    1106              :                                         &longpoll_timeout),
    1107           62 :     GNUNET_GETOPT_option_flag ('I',
    1108              :                                "ignore-not-found",
    1109              :                                "continue, even if the bank account of the exchange was not found",
    1110              :                                &ignore_account_404),
    1111           62 :     GNUNET_GETOPT_option_uint ('S',
    1112              :                                "size",
    1113              :                                "SIZE",
    1114              :                                "Size to process per shard (default: 1024)",
    1115              :                                &shard_size),
    1116           62 :     GNUNET_GETOPT_option_timetravel ('T',
    1117              :                                      "timetravel"),
    1118           62 :     GNUNET_GETOPT_option_flag ('t',
    1119              :                                "test",
    1120              :                                "run in test mode and exit when idle",
    1121              :                                &test_mode),
    1122           62 :     GNUNET_GETOPT_option_uint ('w',
    1123              :                                "workers",
    1124              :                                "COUNT",
    1125              :                                "Plan work load with up to COUNT worker processes (default: 16)",
    1126              :                                &max_workers),
    1127           62 :     GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
    1128              :     GNUNET_GETOPT_OPTION_END
    1129              :   };
    1130              :   enum GNUNET_GenericReturnValue ret;
    1131              : 
    1132           62 :   longpoll_timeout = LONGPOLL_TIMEOUT;
    1133           62 :   ret = GNUNET_PROGRAM_run (
    1134              :     TALER_EXCHANGE_project_data (),
    1135              :     argc, argv,
    1136              :     "taler-exchange-wirewatch",
    1137              :     gettext_noop (
    1138              :       "background process that watches for incoming wire transfers from customers"),
    1139              :     options,
    1140              :     &run, NULL);
    1141           62 :   if (GNUNET_SYSERR == ret)
    1142            0 :     return EXIT_INVALIDARGUMENT;
    1143           62 :   if (GNUNET_NO == ret)
    1144            0 :     return EXIT_SUCCESS;
    1145           62 :   return global_ret;
    1146              : }
    1147              : 
    1148              : 
    1149              : /* end of taler-exchange-wirewatch.c */
        

Generated by: LCOV version 2.0-1