LCOV - code coverage report
Current view: top level - exchange - taler-exchange-wirewatch.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 227 387 58.7 %
Date: 2025-07-03 11:36:01 Functions: 11 12 91.7 %

          Line data    Source code
       1             : /*
       2             :   This file is part of TALER
       3             :   Copyright (C) 2016--2023 Taler Systems SA
       4             : 
       5             :   TALER is free software; you can redistribute it and/or modify it under the
       6             :   terms of the GNU Affero General Public License as published by the Free Software
       7             :   Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
      12             : 
      13             :   You should have received a copy of the GNU Affero General Public License along with
      14             :   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             : */
      16             : /**
      17             :  * @file taler-exchange-wirewatch.c
      18             :  * @brief Process that watches for wire transfers to the exchange's bank account
      19             :  * @author Christian Grothoff
      20             :  */
      21             : #include "taler/platform.h"
      22             : #include <gnunet/gnunet_util_lib.h>
      23             : #include <jansson.h>
      24             : #include <pthread.h>
      25             : #include <microhttpd.h>
      26             : #include "taler/taler_exchangedb_lib.h"
      27             : #include "taler/taler_exchangedb_plugin.h"
      28             : #include "taler/taler_json_lib.h"
      29             : #include "taler/taler_bank_service.h"
      30             : 
      31             : 
      32             : /**
      33             :  * How long to wait for an HTTP reply if there
      34             :  * are no transactions pending at the server?
      35             :  */
      36             : #define LONGPOLL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
      37             : 
      38             : /**
      39             :  * What is the maximum batch size we use for credit history
      40             :  * requests with the bank.  See `batch_size` below.
      41             :  */
      42             : #define MAXIMUM_BATCH_SIZE 1024
      43             : 
      44             : /**
      45             :  * Information about our account.
      46             :  */
      47             : static const struct TALER_EXCHANGEDB_AccountInfo *ai;
      48             : 
      49             : /**
      50             :  * Active request for history.
      51             :  */
      52             : static struct TALER_BANK_CreditHistoryHandle *hh;
      53             : 
      54             : /**
      55             :  * Set to true if the request for history did actually
      56             :  * return transaction items.
      57             :  */
      58             : static bool hh_returned_data;
      59             : 
      60             : /**
      61             :  * Set to true if the request for history did not
      62             :  * succeed because the account was unknown.
      63             :  */
      64             : static bool hh_account_404;
      65             : 
      66             : /**
      67             :  * When did we start the last @e hh request?
      68             :  */
      69             : static struct GNUNET_TIME_Absolute hh_start_time;
      70             : 
      71             : /**
      72             :  * Until when is processing this wire plugin delayed?
      73             :  */
      74             : static struct GNUNET_TIME_Absolute delayed_until;
      75             : 
      76             : /**
      77             :  * Encoded offset in the wire transfer list from where
      78             :  * to start the next query with the bank.
      79             :  */
      80             : static uint64_t batch_start;
      81             : 
      82             : /**
      83             :  * Latest row offset seen in this transaction, becomes
      84             :  * the new #batch_start upon commit.
      85             :  */
      86             : static uint64_t latest_row_off;
      87             : 
      88             : /**
      89             :  * Offset where our current shard begins (inclusive).
      90             :  */
      91             : static uint64_t shard_start;
      92             : 
      93             : /**
      94             :  * Offset where our current shard ends (exclusive).
      95             :  */
      96             : static uint64_t shard_end;
      97             : 
      98             : /**
      99             :  * When did we start with the shard?
     100             :  */
     101             : static struct GNUNET_TIME_Absolute shard_start_time;
     102             : 
     103             : /**
     104             :  * For how long did we lock the shard?
     105             :  */
     106             : static struct GNUNET_TIME_Absolute shard_end_time;
     107             : 
     108             : /**
     109             :  * How long did we take to finish the last shard
     110             :  * for this account?
     111             :  */
     112             : static struct GNUNET_TIME_Relative shard_delay;
     113             : 
     114             : /**
     115             :  * How long did we take to finish the last shard
     116             :  * for this account?
     117             :  */
     118             : static struct GNUNET_TIME_Relative longpoll_timeout;
     119             : 
     120             : /**
     121             :  * How long do we wait on 404.
     122             :  */
     123             : static struct GNUNET_TIME_Relative h404_backoff;
     124             : 
     125             : /**
     126             :  * Name of our job in the shard table.
     127             :  */
     128             : static char *job_name;
     129             : 
     130             : /**
     131             :  * How many transactions do we retrieve per batch?
     132             :  */
     133             : static unsigned int batch_size;
     134             : 
     135             : /**
     136             :  * How much do we increment @e batch_size on success?
     137             :  */
     138             : static unsigned int batch_thresh;
     139             : 
     140             : /**
     141             :  * Did work remain in the transaction queue? Set to true
     142             :  * if we did some work and thus there might be more.
     143             :  */
     144             : static bool progress;
     145             : 
     146             : /**
     147             :  * Did we start a transaction yet?
     148             :  */
     149             : static bool started_transaction;
     150             : 
     151             : /**
     152             :  * Is this shard still open for processing.
     153             :  */
     154             : static bool shard_open;
     155             : 
     156             : /**
     157             :  * Handle to the context for interacting with the bank.
     158             :  */
     159             : static struct GNUNET_CURL_Context *ctx;
     160             : 
     161             : /**
     162             :  * Scheduler context for running the @e ctx.
     163             :  */
     164             : static struct GNUNET_CURL_RescheduleContext *rc;
     165             : 
     166             : /**
     167             :  * The exchange's configuration (global)
     168             :  */
     169             : static const struct GNUNET_CONFIGURATION_Handle *cfg;
     170             : 
     171             : /**
     172             :  * Our DB plugin.
     173             :  */
     174             : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
     175             : 
     176             : /**
     177             :  * How long should we sleep when idle before trying to find more work?
     178             :  * Also used for how long we wait to grab a shard before trying it again.
     179             :  * The value should be set to a bit above the average time it takes to
     180             :  * process a shard.
     181             :  */
     182             : static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
     183             : 
     184             : /**
     185             :  * How long do we sleep on serialization conflicts?
     186             :  */
     187             : static struct GNUNET_TIME_Relative wirewatch_conflict_sleep_interval;
     188             : 
     189             : /**
     190             :  * Modulus to apply to group shards.  The shard size must ultimately be a
     191             :  * multiple of the batch size. Thus, if this is not a multiple of the
     192             :  * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
     193             :  */
     194             : static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
     195             : 
     196             : /**
     197             :  * How many workers should we plan our scheduling with?
     198             :  */
     199             : static unsigned int max_workers = 16;
     200             : 
     201             : /**
     202             :  * -e command-line option: exit on errors talking to the bank?
     203             :  */
     204             : static int exit_on_error;
     205             : 
     206             : /**
     207             :  * Value to return from main(). 0 on success, non-zero on
     208             :  * on serious errors.
     209             :  */
     210             : static int global_ret;
     211             : 
     212             : /**
     213             :  * Are we run in testing mode and should only do one pass?
     214             :  */
     215             : static int test_mode;
     216             : 
     217             : /**
     218             :  * Should we ignore if the bank does not know our bank
     219             :  * account?
     220             :  */
     221             : static int ignore_account_404;
     222             : 
     223             : /**
     224             :  * Current task waiting for execution, if any.
     225             :  */
     226             : static struct GNUNET_SCHEDULER_Task *task;
     227             : 
     228             : /**
     229             :  * Name of the configuration section with the account we should watch.
     230             :  */
     231             : static char *account_section;
     232             : 
     233             : /**
     234             :  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
     235             :  *
     236             :  * @param cls closure
     237             :  */
     238             : static void
     239          64 : shutdown_task (void *cls)
     240             : {
     241             :   enum GNUNET_DB_QueryStatus qs;
     242             :   (void) cls;
     243             : 
     244          64 :   if (NULL != hh)
     245             :   {
     246           2 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     247             :                 "History request cancelled on shutdown\n");
     248           2 :     TALER_BANK_credit_history_cancel (hh);
     249           2 :     hh = NULL;
     250             :   }
     251          64 :   if (started_transaction)
     252             :   {
     253           0 :     db_plugin->rollback (db_plugin->cls);
     254           0 :     started_transaction = false;
     255             :   }
     256          64 :   if (shard_open)
     257             :   {
     258          64 :     qs = db_plugin->abort_shard (db_plugin->cls,
     259             :                                  job_name,
     260             :                                  shard_start,
     261             :                                  shard_end);
     262          64 :     if (qs <= 0)
     263           0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     264             :                   "Failed to abort work shard on shutdown\n");
     265             :   }
     266          64 :   GNUNET_free (job_name);
     267          64 :   if (NULL != ctx)
     268             :   {
     269          64 :     GNUNET_CURL_fini (ctx);
     270          64 :     ctx = NULL;
     271             :   }
     272          64 :   if (NULL != rc)
     273             :   {
     274          64 :     GNUNET_CURL_gnunet_rc_destroy (rc);
     275          64 :     rc = NULL;
     276             :   }
     277          64 :   if (NULL != task)
     278             :   {
     279           0 :     GNUNET_SCHEDULER_cancel (task);
     280           0 :     task = NULL;
     281             :   }
     282          64 :   TALER_EXCHANGEDB_plugin_unload (db_plugin);
     283          64 :   db_plugin = NULL;
     284          64 :   TALER_EXCHANGEDB_unload_accounts ();
     285          64 :   cfg = NULL;
     286          64 : }
     287             : 
     288             : 
     289             : /**
     290             :  * Function called with information about a wire account.  Adds the
     291             :  * account to our list (if it is enabled and we can load the plugin).
     292             :  *
     293             :  * @param cls closure, NULL
     294             :  * @param in_ai account information
     295             :  */
     296             : static void
     297         124 : add_account_cb (void *cls,
     298             :                 const struct TALER_EXCHANGEDB_AccountInfo *in_ai)
     299             : {
     300             :   (void) cls;
     301         124 :   if (! in_ai->credit_enabled)
     302           0 :     return; /* not enabled for us, skip */
     303         124 :   if ( (NULL != account_section) &&
     304         122 :        (0 != strcasecmp (in_ai->section_name,
     305             :                          account_section)) )
     306          60 :     return; /* not enabled for us, skip */
     307          64 :   if (NULL != ai)
     308             :   {
     309           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     310             :                 "Multiple accounts enabled (%s and %s), use '-a' command-line option to select one!\n",
     311             :                 ai->section_name,
     312             :                 in_ai->section_name);
     313           0 :     GNUNET_SCHEDULER_shutdown ();
     314           0 :     global_ret = EXIT_INVALIDARGUMENT;
     315           0 :     return;
     316             :   }
     317          64 :   ai = in_ai;
     318          64 :   GNUNET_asprintf (&job_name,
     319             :                    "wirewatch-%s",
     320          64 :                    ai->section_name);
     321          64 :   batch_size = MAXIMUM_BATCH_SIZE;
     322          64 :   if (0 != shard_size % batch_size)
     323          62 :     batch_size = shard_size;
     324             : }
     325             : 
     326             : 
     327             : /**
     328             :  * Parse configuration parameters for the exchange server into the
     329             :  * corresponding global variables.
     330             :  *
     331             :  * @return #GNUNET_OK on success
     332             :  */
     333             : static enum GNUNET_GenericReturnValue
     334          64 : exchange_serve_process_config (void)
     335             : {
     336          64 :   if (GNUNET_OK !=
     337          64 :       GNUNET_CONFIGURATION_get_value_time (cfg,
     338             :                                            "exchange",
     339             :                                            "WIREWATCH_IDLE_SLEEP_INTERVAL",
     340             :                                            &wirewatch_idle_sleep_interval))
     341             :   {
     342           0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     343             :                                "exchange",
     344             :                                "WIREWATCH_IDLE_SLEEP_INTERVAL");
     345           0 :     return GNUNET_SYSERR;
     346             :   }
     347          64 :   if (NULL ==
     348          64 :       (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg,
     349             :                                                  false)))
     350             :   {
     351           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     352             :                 "Failed to initialize DB subsystem\n");
     353           0 :     return GNUNET_SYSERR;
     354             :   }
     355          64 :   if (GNUNET_OK !=
     356          64 :       TALER_EXCHANGEDB_load_accounts (cfg,
     357             :                                       TALER_EXCHANGEDB_ALO_CREDIT
     358             :                                       | TALER_EXCHANGEDB_ALO_AUTHDATA))
     359             :   {
     360           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     361             :                 "No wire accounts configured for credit!\n");
     362           0 :     return GNUNET_SYSERR;
     363             :   }
     364          64 :   TALER_EXCHANGEDB_find_accounts (&add_account_cb,
     365             :                                   NULL);
     366          64 :   if (NULL == ai)
     367             :   {
     368           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     369             :                 "No accounts enabled for credit!\n");
     370           0 :     GNUNET_SCHEDULER_shutdown ();
     371           0 :     return GNUNET_SYSERR;
     372             :   }
     373          64 :   return GNUNET_OK;
     374             : }
     375             : 
     376             : 
     377             : /**
     378             :  * Lock a shard and then begin to query for incoming wire transfers.
     379             :  *
     380             :  * @param cls NULL
     381             :  */
     382             : static void
     383             : lock_shard (void *cls);
     384             : 
     385             : 
     386             : /**
     387             :  * Continue with the credit history of the shard.
     388             :  *
     389             :  * @param cls NULL
     390             :  */
     391             : static void
     392             : continue_with_shard (void *cls);
     393             : 
     394             : 
     395             : /**
     396             :  * We encountered a serialization error.  Rollback the transaction and try
     397             :  * again.
     398             :  */
     399             : static void
     400           0 : handle_soft_error (void)
     401             : {
     402           0 :   db_plugin->rollback (db_plugin->cls);
     403           0 :   started_transaction = false;
     404           0 :   if (1 < batch_size)
     405             :   {
     406           0 :     batch_thresh = batch_size;
     407           0 :     batch_size /= 2;
     408           0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     409             :                 "Reduced batch size to %llu due to serialization issue\n",
     410             :                 (unsigned long long) batch_size);
     411             :   }
     412             :   /* Reset to beginning of transaction, and go again
     413             :      from there. */
     414           0 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     415             :               "Encountered soft error, resetting start point to batch start\n");
     416           0 :   latest_row_off = batch_start;
     417           0 :   GNUNET_assert (NULL == task);
     418           0 :   task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
     419             :                                    NULL);
     420           0 : }
     421             : 
     422             : 
     423             : /**
     424             :  * Schedule the #lock_shard() operation.
     425             :  */
     426             : static void
     427         169 : schedule_transfers (void)
     428             : {
     429         169 :   if (shard_open)
     430           0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     431             :                 "Will retry my shard (%llu,%llu] of %s in %s\n",
     432             :                 (unsigned long long) shard_start,
     433             :                 (unsigned long long) shard_end,
     434             :                 job_name,
     435             :                 GNUNET_STRINGS_relative_time_to_string (
     436             :                   GNUNET_TIME_absolute_get_remaining (delayed_until),
     437             :                   true));
     438             :   else
     439         169 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     440             :                 "Will try to lock next shard of %s in %s\n",
     441             :                 job_name,
     442             :                 GNUNET_STRINGS_relative_time_to_string (
     443             :                   GNUNET_TIME_absolute_get_remaining (delayed_until),
     444             :                   true));
     445         169 :   GNUNET_assert (NULL == task);
     446         169 :   task = GNUNET_SCHEDULER_add_at (delayed_until,
     447             :                                   &lock_shard,
     448             :                                   NULL);
     449         169 : }
     450             : 
     451             : 
     452             : /**
     453             :  * We are done with the work that is possible right now (and the transaction
     454             :  * was committed, if there was one to commit). Move on to the next shard.
     455             :  */
     456             : static void
     457         167 : transaction_completed (void)
     458             : {
     459         167 :   if ( (batch_start + batch_size ==
     460          61 :         latest_row_off) &&
     461          61 :        (batch_size < MAXIMUM_BATCH_SIZE) )
     462             :   {
     463             :     /* The current batch size worked without serialization
     464             :        issues, and we are allowed to grow. Do so slowly. */
     465             :     int delta;
     466             : 
     467          61 :     delta = ((int) batch_thresh - (int) batch_size) / 4;
     468          61 :     if (delta < 0)
     469           0 :       delta = -delta;
     470          61 :     batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
     471             :                              batch_size + delta + 1);
     472          61 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     473             :                 "Increasing batch size to %llu\n",
     474             :                 (unsigned long long) batch_size);
     475             :   }
     476             : 
     477         167 :   if ( (! progress) && test_mode)
     478             :   {
     479             :     /* Transaction list was drained and we are in
     480             :        test mode. So we are done. */
     481          62 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     482             :                 "Transaction list drained and in test mode. Exiting\n");
     483          62 :     GNUNET_SCHEDULER_shutdown ();
     484          62 :     return;
     485             :   }
     486         105 :   if (! (hh_returned_data || hh_account_404) )
     487             :   {
     488             :     /* Enforce long-polling delay even if the server ignored it
     489             :        and returned earlier */
     490             :     struct GNUNET_TIME_Relative latency;
     491             :     struct GNUNET_TIME_Relative left;
     492             : 
     493           0 :     latency = GNUNET_TIME_absolute_get_duration (hh_start_time);
     494           0 :     left = GNUNET_TIME_relative_subtract (longpoll_timeout,
     495             :                                           latency);
     496           0 :     if (! (test_mode ||
     497           0 :            GNUNET_TIME_relative_is_zero (left)) )
     498           0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     499             :                   "Server did not respect long-polling, enforcing client-side by sleeping for %s\n",
     500             :                   GNUNET_TIME_relative2s (left,
     501             :                                           true));
     502           0 :     delayed_until = GNUNET_TIME_relative_to_absolute (left);
     503             :   }
     504         105 :   if (hh_account_404)
     505             :   {
     506           0 :     h404_backoff = GNUNET_TIME_STD_BACKOFF (h404_backoff);
     507           0 :     delayed_until = GNUNET_TIME_relative_to_absolute (
     508             :       h404_backoff);
     509             :   }
     510             :   else
     511             :   {
     512         105 :     h404_backoff = GNUNET_TIME_UNIT_ZERO;
     513             :   }
     514         105 :   if (test_mode)
     515         105 :     delayed_until = GNUNET_TIME_UNIT_ZERO_ABS;
     516         105 :   GNUNET_assert (NULL == task);
     517         105 :   schedule_transfers ();
     518             : }
     519             : 
     520             : 
     521             : /**
     522             :  * We got incoming transaction details from the bank. Add them
     523             :  * to the database.
     524             :  *
     525             :  * @param details array of transaction details
     526             :  * @param details_length length of the @a details array
     527             :  */
     528             : static void
     529         109 : process_reply (const struct TALER_BANK_CreditDetails *details,
     530             :                unsigned int details_length)
     531             : {
     532             :   enum GNUNET_DB_QueryStatus qs;
     533             :   bool shard_done;
     534         109 :   uint64_t lroff = latest_row_off;
     535             : 
     536         109 :   if (0 == details_length)
     537             :   {
     538             :     /* Server should have used 204, not 200! */
     539           0 :     GNUNET_break_op (0);
     540           0 :     transaction_completed ();
     541           0 :     return;
     542             :   }
     543         109 :   hh_returned_data = true;
     544             :   /* check serial IDs for range constraints */
     545         183 :   for (unsigned int i = 0; i<details_length; i++)
     546             :   {
     547         109 :     const struct TALER_BANK_CreditDetails *cd = &details[i];
     548             : 
     549         109 :     if (cd->serial_id < lroff)
     550             :     {
     551           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     552             :                   "Serial ID %llu not monotonic (got %llu before). Failing!\n",
     553             :                   (unsigned long long) cd->serial_id,
     554             :                   (unsigned long long) lroff);
     555           0 :       db_plugin->rollback (db_plugin->cls);
     556           0 :       GNUNET_SCHEDULER_shutdown ();
     557           0 :       return;
     558             :     }
     559         109 :     if (cd->serial_id > shard_end)
     560             :     {
     561             :       /* we are *past* the current shard (likely because the serial_id of the
     562             :          shard_end happens to not exist in the DB). So commit and stop this
     563             :          iteration! */
     564          35 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     565             :                   "Serial ID %llu past shard end at %llu, ending iteration early!\n",
     566             :                   (unsigned long long) cd->serial_id,
     567             :                   (unsigned long long) shard_end);
     568          35 :       details_length = i;
     569          35 :       progress = true;
     570          35 :       lroff = cd->serial_id - 1;
     571          35 :       break;
     572             :     }
     573          74 :     lroff = cd->serial_id;
     574             :   }
     575         109 :   if (0 != details_length)
     576          74 :   {
     577          74 :     enum GNUNET_DB_QueryStatus qss[details_length];
     578          74 :     struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length];
     579          74 :     unsigned int j = 0;
     580             : 
     581             :     /* make compiler happy */
     582          74 :     memset (qss,
     583             :             0,
     584             :             sizeof (qss));
     585          74 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     586             :                 "Importing %u transactions\n",
     587             :                 details_length);
     588         148 :     for (unsigned int i = 0; i<details_length; i++)
     589             :     {
     590          74 :       const struct TALER_BANK_CreditDetails *cd = &details[i];
     591             : 
     592          74 :       switch (cd->type)
     593             :       {
     594          58 :       case TALER_BANK_CT_RESERVE:
     595             :         {
     596          58 :           struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[j++];
     597             : 
     598             :           /* add to batch, do later */
     599          58 :           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     600             :                       "Importing reserve transfer over %s\n",
     601             :                       TALER_amount2s (&cd->amount));
     602          58 :           res->reserve_pub = &cd->details.reserve.reserve_pub;
     603          58 :           res->balance = &cd->amount;
     604          58 :           res->execution_time = cd->execution_date;
     605          58 :           res->sender_account_details = cd->debit_account_uri;
     606          58 :           res->exchange_account_name = ai->section_name;
     607          58 :           res->wire_reference = cd->serial_id;
     608             :         }
     609          58 :         break;
     610          16 :       case TALER_BANK_CT_KYCAUTH:
     611             :         {
     612          16 :           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     613             :                       "Importing KYC auth transfer over %s\n",
     614             :                       TALER_amount2s (&cd->amount));
     615          16 :           qs = db_plugin->kycauth_in_insert (
     616          16 :             db_plugin->cls,
     617             :             &cd->details.kycauth.account_pub,
     618             :             &cd->amount,
     619             :             cd->execution_date,
     620             :             cd->debit_account_uri,
     621          16 :             ai->section_name,
     622          16 :             cd->serial_id);
     623             :           switch (qs)
     624             :           {
     625           0 :           case GNUNET_DB_STATUS_HARD_ERROR:
     626           0 :             GNUNET_break (0);
     627           0 :             GNUNET_SCHEDULER_shutdown ();
     628           0 :             return;
     629           0 :           case GNUNET_DB_STATUS_SOFT_ERROR:
     630           0 :             GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     631             :                         "Got DB soft error for kycauth_in_insert (%u). Rolling back.\n",
     632             :                         i);
     633           0 :             handle_soft_error ();
     634           0 :             return;
     635          16 :           default:
     636          16 :             break;
     637             :           }
     638          16 :           break;
     639             :         }
     640           0 :       case TALER_BANK_CT_WAD:
     641             :         {
     642           0 :           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     643             :                       "Importing WAD transfer over %s\n",
     644             :                       TALER_amount2s (&cd->amount));
     645           0 :           qs = db_plugin->wad_in_insert (
     646           0 :             db_plugin->cls,
     647             :             &cd->details.wad.wad_id,
     648           0 :             cd->details.wad.origin_exchange_url,
     649             :             &cd->amount,
     650             :             cd->execution_date,
     651             :             cd->debit_account_uri,
     652           0 :             ai->section_name,
     653           0 :             cd->serial_id);
     654             :           switch (qs)
     655             :           {
     656           0 :           case GNUNET_DB_STATUS_HARD_ERROR:
     657           0 :             GNUNET_break (0);
     658           0 :             GNUNET_SCHEDULER_shutdown ();
     659           0 :             return;
     660           0 :           case GNUNET_DB_STATUS_SOFT_ERROR:
     661           0 :             GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     662             :                         "Got DB soft error for wad_in_insert (%u). Rolling back.\n",
     663             :                         i);
     664           0 :             handle_soft_error ();
     665           0 :             return;
     666           0 :           default:
     667           0 :             break;
     668             :           }
     669             : 
     670             :         }
     671             :       }
     672             :     }
     673          74 :     if (j > 0)
     674             :     {
     675          58 :       qs = db_plugin->reserves_in_insert (db_plugin->cls,
     676             :                                           reserves,
     677             :                                           j,
     678             :                                           qss);
     679          58 :       switch (qs)
     680             :       {
     681           0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     682           0 :         GNUNET_break (0);
     683           0 :         GNUNET_SCHEDULER_shutdown ();
     684           0 :         return;
     685           0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     686           0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     687             :                     "Got DB soft error for reserves_in_insert (%u). Rolling back.\n",
     688             :                     details_length);
     689           0 :         handle_soft_error ();
     690           0 :         return;
     691          58 :       default:
     692          58 :         break;
     693             :       }
     694             :     }
     695          74 :     j = 0;
     696         148 :     for (unsigned int i = 0; i<details_length; i++)
     697             :     {
     698          74 :       const struct TALER_BANK_CreditDetails *cd = &details[i];
     699             : 
     700          74 :       if (TALER_BANK_CT_RESERVE != cd->type)
     701          16 :         continue;
     702          58 :       switch (qss[j++])
     703             :       {
     704           0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     705           0 :         GNUNET_break (0);
     706           0 :         GNUNET_SCHEDULER_shutdown ();
     707           0 :         return;
     708           0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     709           0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     710             :                     "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n",
     711             :                     i);
     712           0 :         handle_soft_error ();
     713           0 :         return;
     714           0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     715             :         /* Either wirewatch was freshly started after the system was
     716             :            shutdown and we're going over an incomplete shard again
     717             :            after being restarted, or the shard lock period was too
     718             :            short (number of workers set incorrectly?) and a 2nd
     719             :            wirewatcher has been stealing our work while we are still
     720             :            at it. */
     721           0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     722             :                     "Attempted to import transaction %llu (%s) twice. "
     723             :                     "This should happen rarely (if not, ask for support).\n",
     724             :                     (unsigned long long) cd->serial_id,
     725             :                     job_name);
     726           0 :         break;
     727          58 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     728          58 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     729             :                     "Imported transaction %llu.\n",
     730             :                     (unsigned long long) cd->serial_id);
     731             :         /* normal case */
     732          58 :         progress = true;
     733          58 :         break;
     734             :       }
     735             :     }
     736             :   }
     737             : 
     738         109 :   latest_row_off = lroff;
     739         109 :   shard_done = (shard_end <= latest_row_off);
     740         109 :   if (shard_done)
     741             :   {
     742             :     /* shard is complete, mark this as well */
     743         105 :     qs = db_plugin->complete_shard (db_plugin->cls,
     744             :                                     job_name,
     745             :                                     shard_start,
     746             :                                     shard_end);
     747         105 :     switch (qs)
     748             :     {
     749           0 :     case GNUNET_DB_STATUS_HARD_ERROR:
     750           0 :       GNUNET_break (0);
     751           0 :       GNUNET_SCHEDULER_shutdown ();
     752           0 :       return;
     753           0 :     case GNUNET_DB_STATUS_SOFT_ERROR:
     754           0 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     755             :                   "Got DB soft error for complete_shard. Rolling back.\n");
     756           0 :       handle_soft_error ();
     757           0 :       return;
     758           0 :     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     759           0 :       GNUNET_break (0);
     760             :       /* Not expected, but let's just continue */
     761           0 :       break;
     762         105 :     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     763             :       /* normal case */
     764         105 :       progress = true;
     765         105 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     766             :                   "Completed shard %s (%llu,%llu] after %s\n",
     767             :                   job_name,
     768             :                   (unsigned long long) shard_start,
     769             :                   (unsigned long long) shard_end,
     770             :                   GNUNET_STRINGS_relative_time_to_string (
     771             :                     GNUNET_TIME_absolute_get_duration (shard_start_time),
     772             :                     true));
     773         105 :       break;
     774             :     }
     775         105 :     shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
     776         105 :     shard_open = false;
     777         105 :     transaction_completed ();
     778         105 :     return;
     779             :   }
     780           4 :   GNUNET_assert (NULL == task);
     781           4 :   task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
     782             :                                    NULL);
     783             : }
     784             : 
     785             : 
     786             : /**
     787             :  * Callbacks of this type are used to serve the result of asking
     788             :  * the bank for the transaction history.
     789             :  *
     790             :  * @param cls NULL
     791             :  * @param reply response we got from the bank
     792             :  */
     793             : static void
     794         171 : history_cb (void *cls,
     795             :             const struct TALER_BANK_CreditHistoryResponse *reply)
     796             : {
     797             :   (void) cls;
     798         171 :   GNUNET_assert (NULL == task);
     799         171 :   hh = NULL;
     800         171 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     801             :               "History request returned with HTTP status %u\n",
     802             :               reply->http_status);
     803         171 :   switch (reply->http_status)
     804             :   {
     805         109 :   case MHD_HTTP_OK:
     806         109 :     process_reply (reply->details.ok.details,
     807         109 :                    reply->details.ok.details_length);
     808         109 :     return;
     809          61 :   case MHD_HTTP_NO_CONTENT:
     810          61 :     transaction_completed ();
     811          61 :     return;
     812           1 :   case MHD_HTTP_NOT_FOUND:
     813           1 :     hh_account_404 = true;
     814           1 :     if (ignore_account_404)
     815             :     {
     816           0 :       transaction_completed ();
     817           0 :       return;
     818             :     }
     819           1 :     break;
     820           0 :   default:
     821           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     822             :                 "Error fetching history: %s (%u)\n",
     823             :                 TALER_ErrorCode_get_hint (reply->ec),
     824             :                 reply->http_status);
     825           0 :     break;
     826             :   }
     827           1 :   if (! exit_on_error)
     828             :   {
     829           1 :     transaction_completed ();
     830           1 :     return;
     831             :   }
     832           0 :   GNUNET_SCHEDULER_shutdown ();
     833             : }
     834             : 
     835             : 
     836             : static void
     837         173 : continue_with_shard (void *cls)
     838             : {
     839             :   unsigned int limit;
     840             : 
     841             :   (void) cls;
     842         173 :   task = NULL;
     843         173 :   GNUNET_assert (shard_end > latest_row_off);
     844         173 :   limit = GNUNET_MIN (batch_size,
     845             :                       shard_end - latest_row_off);
     846         173 :   GNUNET_assert (NULL == hh);
     847         173 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     848             :               "Requesting credit history starting from %llu\n",
     849             :               (unsigned long long) latest_row_off);
     850         173 :   hh_start_time = GNUNET_TIME_absolute_get ();
     851         173 :   hh_returned_data = false;
     852         173 :   hh_account_404 = false;
     853         173 :   hh = TALER_BANK_credit_history (ctx,
     854         173 :                                   ai->auth,
     855             :                                   latest_row_off,
     856             :                                   limit,
     857             :                                   test_mode
     858         173 :                                   ? GNUNET_TIME_UNIT_ZERO
     859             :                                   : longpoll_timeout,
     860             :                                   &history_cb,
     861             :                                   NULL);
     862         173 :   if (NULL == hh)
     863             :   {
     864           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     865             :                 "Failed to start request for account history!\n");
     866           0 :     global_ret = EXIT_FAILURE;
     867           0 :     GNUNET_SCHEDULER_shutdown ();
     868           0 :     return;
     869             :   }
     870             : }
     871             : 
     872             : 
     873             : /**
     874             :  * Reserve a shard for us to work on.
     875             :  *
     876             :  * @param cls NULL
     877             :  */
     878             : static void
     879         169 : lock_shard (void *cls)
     880             : {
     881             :   enum GNUNET_DB_QueryStatus qs;
     882             :   struct GNUNET_TIME_Relative delay;
     883         169 :   uint64_t last_shard_start = shard_start;
     884         169 :   uint64_t last_shard_end = shard_end;
     885             : 
     886             :   (void) cls;
     887         169 :   task = NULL;
     888         169 :   if (GNUNET_SYSERR ==
     889         169 :       db_plugin->preflight (db_plugin->cls))
     890             :   {
     891           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     892             :                 "Failed to obtain database connection!\n");
     893           0 :     global_ret = EXIT_FAILURE;
     894           0 :     GNUNET_SCHEDULER_shutdown ();
     895           0 :     return;
     896             :   }
     897         169 :   if ( (shard_open) &&
     898           0 :        (GNUNET_TIME_absolute_is_future (shard_end_time)) )
     899             :   {
     900           0 :     progress = false;
     901           0 :     batch_start = latest_row_off;
     902           0 :     task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
     903             :                                      NULL);
     904           0 :     return;
     905             :   }
     906         169 :   if (shard_open)
     907           0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     908             :                 "Shard not completed in time, will try to re-acquire\n");
     909             :   /* How long we lock a shard depends on the number of
     910             :      workers expected, and how long we usually took to
     911             :      process a shard. */
     912         169 :   if (0 == max_workers)
     913         167 :     delay = GNUNET_TIME_UNIT_ZERO;
     914             :   else
     915           2 :     delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
     916             :       GNUNET_CRYPTO_QUALITY_WEAK,
     917           2 :       4 * GNUNET_TIME_relative_max (
     918             :         wirewatch_idle_sleep_interval,
     919             :         GNUNET_TIME_relative_multiply (shard_delay,
     920           2 :                                        max_workers)).rel_value_us);
     921         169 :   shard_start_time = GNUNET_TIME_absolute_get ();
     922         169 :   qs = db_plugin->begin_shard (db_plugin->cls,
     923             :                                job_name,
     924             :                                delay,
     925             :                                shard_size,
     926             :                                &shard_start,
     927             :                                &shard_end);
     928         169 :   switch (qs)
     929             :   {
     930           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     931           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     932             :                 "Failed to obtain starting point for monitoring from database!\n");
     933           0 :     global_ret = EXIT_FAILURE;
     934           0 :     GNUNET_SCHEDULER_shutdown ();
     935           0 :     return;
     936           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     937             :     /* try again */
     938             :     {
     939             :       struct GNUNET_TIME_Relative rdelay;
     940             : 
     941             :       wirewatch_conflict_sleep_interval
     942           0 :         = GNUNET_TIME_STD_BACKOFF (wirewatch_conflict_sleep_interval);
     943           0 :       rdelay = GNUNET_TIME_randomize (wirewatch_conflict_sleep_interval);
     944           0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     945             :                   "Serialization error tying to obtain shard %s, will try again in %s!\n",
     946             :                   job_name,
     947             :                   GNUNET_STRINGS_relative_time_to_string (rdelay,
     948             :                                                           true));
     949             : #if 1
     950           0 :       if (GNUNET_TIME_relative_cmp (rdelay,
     951             :                                     >,
     952             :                                     GNUNET_TIME_UNIT_SECONDS))
     953           0 :         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     954             :                     "Delay would have been for %s\n",
     955             :                     GNUNET_TIME_relative2s (rdelay,
     956             :                                             true));
     957           0 :       rdelay = GNUNET_TIME_relative_min (rdelay,
     958             :                                          GNUNET_TIME_UNIT_SECONDS);
     959             : #endif
     960           0 :       delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
     961             :     }
     962           0 :     GNUNET_assert (NULL == task);
     963           0 :     schedule_transfers ();
     964           0 :     return;
     965           0 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     966           0 :     GNUNET_break (0);
     967           0 :     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     968             :                 "No shard available, will try again for %s in %s!\n",
     969             :                 job_name,
     970             :                 GNUNET_STRINGS_relative_time_to_string (
     971             :                   wirewatch_idle_sleep_interval,
     972             :                   true));
     973           0 :     delayed_until = GNUNET_TIME_relative_to_absolute (
     974             :       wirewatch_idle_sleep_interval);
     975           0 :     shard_open = false;
     976           0 :     GNUNET_assert (NULL == task);
     977           0 :     schedule_transfers ();
     978           0 :     return;
     979         169 :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     980             :     /* continued below */
     981         169 :     wirewatch_conflict_sleep_interval = GNUNET_TIME_UNIT_ZERO;
     982         169 :     break;
     983             :   }
     984         169 :   shard_end_time = GNUNET_TIME_relative_to_absolute (delay);
     985         169 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     986             :               "Starting with shard %s at (%llu,%llu] locked for %s\n",
     987             :               job_name,
     988             :               (unsigned long long) shard_start,
     989             :               (unsigned long long) shard_end,
     990             :               GNUNET_STRINGS_relative_time_to_string (delay,
     991             :                                                       true));
     992         169 :   progress = false;
     993         169 :   batch_start = shard_start;
     994         169 :   if ( (shard_open) &&
     995           0 :        (shard_start == last_shard_start) &&
     996           0 :        (shard_end == last_shard_end) )
     997             :   {
     998           0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     999             :                 "Continuing from %llu\n",
    1000             :                 (unsigned long long) latest_row_off);
    1001           0 :     GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */
    1002             :   }
    1003             :   else
    1004             :   {
    1005         169 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    1006             :                 "Resetting shard start to original start point (%d)\n",
    1007             :                 shard_open ? 1 : 0);
    1008         169 :     latest_row_off = batch_start;
    1009             :   }
    1010         169 :   shard_open = true;
    1011         169 :   task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
    1012             :                                    NULL);
    1013             : }
    1014             : 
    1015             : 
    1016             : /**
    1017             :  * First task.
    1018             :  *
    1019             :  * @param cls closure, NULL
    1020             :  * @param args remaining command-line arguments
    1021             :  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
    1022             :  * @param c configuration
    1023             :  */
    1024             : static void
    1025          64 : run (void *cls,
    1026             :      char *const *args,
    1027             :      const char *cfgfile,
    1028             :      const struct GNUNET_CONFIGURATION_Handle *c)
    1029             : {
    1030             :   (void) cls;
    1031             :   (void) args;
    1032             :   (void) cfgfile;
    1033             : 
    1034          64 :   cfg = c;
    1035          64 :   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
    1036             :                                  cls);
    1037          64 :   if (GNUNET_OK !=
    1038          64 :       exchange_serve_process_config ())
    1039             :   {
    1040           0 :     global_ret = EXIT_NOTCONFIGURED;
    1041           0 :     GNUNET_SCHEDULER_shutdown ();
    1042           0 :     return;
    1043             :   }
    1044          64 :   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
    1045             :                           &rc);
    1046          64 :   if (NULL == ctx)
    1047             :   {
    1048           0 :     GNUNET_break (0);
    1049           0 :     GNUNET_SCHEDULER_shutdown ();
    1050           0 :     global_ret = EXIT_NO_RESTART;
    1051           0 :     return;
    1052             :   }
    1053          64 :   rc = GNUNET_CURL_gnunet_rc_create (ctx);
    1054          64 :   schedule_transfers ();
    1055             : }
    1056             : 
    1057             : 
    1058             : /**
    1059             :  * The main function of taler-exchange-wirewatch
    1060             :  *
    1061             :  * @param argc number of arguments from the command line
    1062             :  * @param argv command line arguments
    1063             :  * @return 0 ok, non-zero on error
    1064             :  */
    1065             : int
    1066          64 : main (int argc,
    1067             :       char *const *argv)
    1068             : {
    1069          64 :   struct GNUNET_GETOPT_CommandLineOption options[] = {
    1070          64 :     GNUNET_GETOPT_option_string ('a',
    1071             :                                  "account",
    1072             :                                  "SECTION_NAME",
    1073             :                                  "name of the configuration section with the account we should watch (needed if more than one is enabled for crediting)",
    1074             :                                  &account_section),
    1075          64 :     GNUNET_GETOPT_option_flag ('e',
    1076             :                                "exit-on-error",
    1077             :                                "terminate wirewatch if we failed to download information from the bank",
    1078             :                                &exit_on_error),
    1079          64 :     GNUNET_GETOPT_option_relative_time ('f',
    1080             :                                         "longpoll-timeout",
    1081             :                                         "DELAY",
    1082             :                                         "what is the timeout when asking the bank about new transactions, specify with unit (e.g. --longpoll-timeout=30s)",
    1083             :                                         &longpoll_timeout),
    1084          64 :     GNUNET_GETOPT_option_flag ('I',
    1085             :                                "ignore-not-found",
    1086             :                                "continue, even if the bank account of the exchange was not found",
    1087             :                                &ignore_account_404),
    1088          64 :     GNUNET_GETOPT_option_uint ('S',
    1089             :                                "size",
    1090             :                                "SIZE",
    1091             :                                "Size to process per shard (default: 1024)",
    1092             :                                &shard_size),
    1093          64 :     GNUNET_GETOPT_option_timetravel ('T',
    1094             :                                      "timetravel"),
    1095          64 :     GNUNET_GETOPT_option_flag ('t',
    1096             :                                "test",
    1097             :                                "run in test mode and exit when idle",
    1098             :                                &test_mode),
    1099          64 :     GNUNET_GETOPT_option_uint ('w',
    1100             :                                "workers",
    1101             :                                "COUNT",
    1102             :                                "Plan work load with up to COUNT worker processes (default: 16)",
    1103             :                                &max_workers),
    1104          64 :     GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
    1105             :     GNUNET_GETOPT_OPTION_END
    1106             :   };
    1107             :   enum GNUNET_GenericReturnValue ret;
    1108             : 
    1109          64 :   longpoll_timeout = LONGPOLL_TIMEOUT;
    1110          64 :   ret = GNUNET_PROGRAM_run (
    1111             :     TALER_EXCHANGE_project_data (),
    1112             :     argc, argv,
    1113             :     "taler-exchange-wirewatch",
    1114             :     gettext_noop (
    1115             :       "background process that watches for incoming wire transfers from customers"),
    1116             :     options,
    1117             :     &run, NULL);
    1118          64 :   if (GNUNET_SYSERR == ret)
    1119           0 :     return EXIT_INVALIDARGUMENT;
    1120          64 :   if (GNUNET_NO == ret)
    1121           0 :     return EXIT_SUCCESS;
    1122          64 :   return global_ret;
    1123             : }
    1124             : 
    1125             : 
    1126             : /* end of taler-exchange-wirewatch.c */

Generated by: LCOV version 1.16