LCOV - code coverage report
Current view: top level - exchange - taler-exchange-wirewatch.c (source / functions) Hit Total Coverage
Test: GNU Taler exchange coverage report Lines: 159 247 64.4 %
Date: 2021-08-30 06:43:37 Functions: 8 9 88.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   This file is part of TALER
       3             :   Copyright (C) 2016--2021 Taler Systems SA
       4             : 
       5             :   TALER is free software; you can redistribute it and/or modify it under the
       6             :   terms of the GNU Affero General Public License as published by the Free Software
       7             :   Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
      12             : 
      13             :   You should have received a copy of the GNU Affero General Public License along with
      14             :   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             : */
      16             : 
      17             : /**
      18             :  * @file taler-exchange-wirewatch.c
      19             :  * @brief Process that watches for wire transfers to the exchange's bank account
      20             :  * @author Christian Grothoff
      21             :  */
      22             : #include "platform.h"
      23             : #include <gnunet/gnunet_util_lib.h>
      24             : #include <jansson.h>
      25             : #include <pthread.h>
      26             : #include <microhttpd.h>
      27             : #include "taler_exchangedb_lib.h"
      28             : #include "taler_exchangedb_plugin.h"
      29             : #include "taler_json_lib.h"
      30             : #include "taler_bank_service.h"
      31             : 
      32             : 
      33             : /**
      34             :  * How long to wait for an HTTP reply if there
      35             :  * are no transactions pending at the server?
      36             :  */
      37             : #define LONGPOLL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
      38             : 
      39             : /**
      40             :  * What is the maximum batch size we use for credit history
      41             :  * requests with the bank.  See `batch_size` below.
      42             :  */
      43             : #define MAXIMUM_BATCH_SIZE 1024
      44             : 
      45             : /**
      46             :  * Information we keep for each supported account.
      47             :  */
      48             : struct WireAccount
      49             : {
      50             :   /**
      51             :    * Accounts are kept in a DLL.
      52             :    */
      53             :   struct WireAccount *next;
      54             : 
      55             :   /**
      56             :    * Plugins are kept in a DLL.
      57             :    */
      58             :   struct WireAccount *prev;
      59             : 
      60             :   /**
      61             :    * Information about this account.
      62             :    */
      63             :   const struct TALER_EXCHANGEDB_AccountInfo *ai;
      64             : 
      65             :   /**
      66             :    * Active request for history.
      67             :    */
      68             :   struct TALER_BANK_CreditHistoryHandle *hh;
      69             : 
      70             :   /**
      71             :    * Until when is processing this wire plugin delayed?
      72             :    */
      73             :   struct GNUNET_TIME_Absolute delayed_until;
      74             : 
      75             :   /**
      76             :    * Encoded offset in the wire transfer list from where
      77             :    * to start the next query with the bank.
      78             :    */
      79             :   uint64_t batch_start;
      80             : 
      81             :   /**
      82             :    * Latest row offset seen in this transaction, becomes
      83             :    * the new #batch_start upon commit.
      84             :    */
      85             :   uint64_t latest_row_off;
      86             : 
      87             :   /**
      88             :    * Offset where our current shard begins (inclusive).
      89             :    */
      90             :   uint64_t shard_start;
      91             : 
      92             :   /**
      93             :    * Offset where our current shard ends (exclusive).
      94             :    */
      95             :   uint64_t shard_end;
      96             : 
      97             :   /**
      98             :    * When did we start with the shard?
      99             :    */
     100             :   struct GNUNET_TIME_Absolute shard_start_time;
     101             : 
     102             :   /**
     103             :    * Name of our job in the shard table.
     104             :    */
     105             :   char *job_name;
     106             : 
     107             :   /**
     108             :    * How many transactions do we retrieve per batch?
     109             :    */
     110             :   unsigned int batch_size;
     111             : 
     112             :   /**
     113             :    * How much do we incremnt @e batch_size on success?
     114             :    */
     115             :   unsigned int batch_thresh;
     116             : 
     117             :   /**
     118             :    * How many transactions did we see in the current batch?
     119             :    */
     120             :   unsigned int current_batch_size;
     121             : 
     122             :   /**
     123             :    * Should we delay the next request to the wire plugin a bit?  Set to
     124             :    * false if we actually did some work.
     125             :    */
     126             :   bool delay;
     127             : 
     128             : };
     129             : 
     130             : 
     131             : /**
     132             :  * Head of list of loaded wire plugins.
     133             :  */
     134             : static struct WireAccount *wa_head;
     135             : 
     136             : /**
     137             :  * Tail of list of loaded wire plugins.
     138             :  */
     139             : static struct WireAccount *wa_tail;
     140             : 
     141             : /**
     142             :  * Wire account we are currently processing.  This would go away
     143             :  * if we ever start processing all accounts in parallel.
     144             :  */
     145             : static struct WireAccount *wa_pos;
     146             : 
     147             : /**
     148             :  * Handle to the context for interacting with the bank.
     149             :  */
     150             : static struct GNUNET_CURL_Context *ctx;
     151             : 
     152             : /**
     153             :  * Scheduler context for running the @e ctx.
     154             :  */
     155             : static struct GNUNET_CURL_RescheduleContext *rc;
     156             : 
     157             : /**
     158             :  * The exchange's configuration (global)
     159             :  */
     160             : static const struct GNUNET_CONFIGURATION_Handle *cfg;
     161             : 
     162             : /**
     163             :  * Our DB plugin.
     164             :  */
     165             : static struct TALER_EXCHANGEDB_Plugin *db_plugin;
     166             : 
     167             : /**
     168             :  * How long should we sleep when idle before trying to find more work?
     169             :  * Also used for how long we wait to grab a shard before trying it again.
     170             :  * The value should be set to a bit above the average time it takes to
     171             :  * process a shard.
     172             :  */
     173             : static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
     174             : 
     175             : /**
     176             :  * How long did we take to finish the last shard?
     177             :  */
     178             : static struct GNUNET_TIME_Relative shard_delay;
     179             : 
     180             : /**
     181             :  * Modulus to apply to group shards.  The shard size must ultimately be a
     182             :  * multiple of the batch size. Thus, if this is not a multiple of the
     183             :  * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
     184             :  */
     185             : static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
     186             : 
     187             : /**
     188             :  * How many workers should we plan our scheduling with?
     189             :  */
     190             : static unsigned int max_workers = 16;
     191             : 
     192             : 
     193             : /**
     194             :  * Value to return from main(). 0 on success, non-zero on
     195             :  * on serious errors.
     196             :  */
     197             : static int global_ret;
     198             : 
     199             : /**
     200             :  * Are we run in testing mode and should only do one pass?
     201             :  */
     202             : static int test_mode;
     203             : 
     204             : /**
     205             :  * Current task waiting for execution, if any.
     206             :  */
     207             : static struct GNUNET_SCHEDULER_Task *task;
     208             : 
     209             : 
     210             : /**
     211             :  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
     212             :  *
     213             :  * @param cls closure
     214             :  */
     215             : static void
     216          29 : shutdown_task (void *cls)
     217             : {
     218             :   (void) cls;
     219             :   {
     220             :     struct WireAccount *wa;
     221             : 
     222          58 :     while (NULL != (wa = wa_head))
     223             :     {
     224          29 :       if (NULL != wa->hh)
     225             :       {
     226           0 :         TALER_BANK_credit_history_cancel (wa->hh);
     227           0 :         wa->hh = NULL;
     228             :       }
     229          29 :       GNUNET_CONTAINER_DLL_remove (wa_head,
     230             :                                    wa_tail,
     231             :                                    wa);
     232          29 :       GNUNET_free (wa->job_name);
     233          29 :       GNUNET_free (wa);
     234             :     }
     235             :   }
     236          29 :   wa_pos = NULL;
     237             : 
     238          29 :   if (NULL != ctx)
     239             :   {
     240          29 :     GNUNET_CURL_fini (ctx);
     241          29 :     ctx = NULL;
     242             :   }
     243          29 :   if (NULL != rc)
     244             :   {
     245          29 :     GNUNET_CURL_gnunet_rc_destroy (rc);
     246          29 :     rc = NULL;
     247             :   }
     248          29 :   if (NULL != task)
     249             :   {
     250           2 :     GNUNET_SCHEDULER_cancel (task);
     251           2 :     task = NULL;
     252             :   }
     253          29 :   TALER_EXCHANGEDB_plugin_unload (db_plugin);
     254          29 :   db_plugin = NULL;
     255          29 :   TALER_EXCHANGEDB_unload_accounts ();
     256          29 :   cfg = NULL;
     257          29 : }
     258             : 
     259             : 
     260             : /**
     261             :  * Function called with information about a wire account.  Adds the
     262             :  * account to our list (if it is enabled and we can load the plugin).
     263             :  *
     264             :  * @param cls closure, NULL
     265             :  * @param ai account information
     266             :  */
     267             : static void
     268          29 : add_account_cb (void *cls,
     269             :                 const struct TALER_EXCHANGEDB_AccountInfo *ai)
     270             : {
     271             :   struct WireAccount *wa;
     272             : 
     273             :   (void) cls;
     274          29 :   if (! ai->credit_enabled)
     275           0 :     return; /* not enabled for us, skip */
     276          29 :   wa = GNUNET_new (struct WireAccount);
     277          29 :   wa->ai = ai;
     278          29 :   GNUNET_asprintf (&wa->job_name,
     279             :                    "wirewatch-%s",
     280             :                    ai->section_name);
     281          29 :   wa->batch_size = MAXIMUM_BATCH_SIZE;
     282          29 :   if (0 != shard_size % wa->batch_size)
     283          27 :     wa->batch_size = shard_size;
     284          29 :   GNUNET_CONTAINER_DLL_insert (wa_head,
     285             :                                wa_tail,
     286             :                                wa);
     287             : }
     288             : 
     289             : 
     290             : /**
     291             :  * Parse configuration parameters for the exchange server into the
     292             :  * corresponding global variables.
     293             :  *
     294             :  * @return #GNUNET_OK on success
     295             :  */
     296             : static int
     297          29 : exchange_serve_process_config (void)
     298             : {
     299          29 :   if (GNUNET_OK !=
     300          29 :       GNUNET_CONFIGURATION_get_value_time (cfg,
     301             :                                            "exchange",
     302             :                                            "WIREWATCH_IDLE_SLEEP_INTERVAL",
     303             :                                            &wirewatch_idle_sleep_interval))
     304             :   {
     305           0 :     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
     306             :                                "exchange",
     307             :                                "WIREWATCH_IDLE_SLEEP_INTERVAL");
     308           0 :     return GNUNET_SYSERR;
     309             :   }
     310          29 :   if (NULL ==
     311          29 :       (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
     312             :   {
     313           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     314             :                 "Failed to initialize DB subsystem\n");
     315           0 :     return GNUNET_SYSERR;
     316             :   }
     317          29 :   if (GNUNET_OK !=
     318          29 :       TALER_EXCHANGEDB_load_accounts (cfg,
     319             :                                       TALER_EXCHANGEDB_ALO_CREDIT
     320             :                                       | TALER_EXCHANGEDB_ALO_AUTHDATA))
     321             :   {
     322           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     323             :                 "No wire accounts configured for credit!\n");
     324           0 :     TALER_EXCHANGEDB_plugin_unload (db_plugin);
     325           0 :     db_plugin = NULL;
     326           0 :     return GNUNET_SYSERR;
     327             :   }
     328          29 :   TALER_EXCHANGEDB_find_accounts (&add_account_cb,
     329             :                                   NULL);
     330          29 :   GNUNET_assert (NULL != wa_head);
     331          29 :   return GNUNET_OK;
     332             : }
     333             : 
     334             : 
     335             : /**
     336             :  * Query for incoming wire transfers.
     337             :  *
     338             :  * @param cls NULL
     339             :  */
     340             : static void
     341             : find_transfers (void *cls);
     342             : 
     343             : 
     344             : /**
     345             :  * We encountered a serialization error.
     346             :  * Rollback the transaction and try again
     347             :  *
     348             :  * @param wa account we are transacting on
     349             :  */
     350             : static void
     351           0 : handle_soft_error (struct WireAccount *wa)
     352             : {
     353           0 :   db_plugin->rollback (db_plugin->cls);
     354           0 :   if (1 < wa->batch_size)
     355             :   {
     356           0 :     wa->batch_thresh = wa->batch_size;
     357           0 :     wa->batch_size /= 2;
     358           0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     359             :                 "Reduced batch size to %llu due to serialization issue\n",
     360             :                 (unsigned long long) wa->batch_size);
     361             :   }
     362           0 :   GNUNET_assert (NULL == task);
     363           0 :   task = GNUNET_SCHEDULER_add_now (&find_transfers,
     364             :                                    NULL);
     365           0 : }
     366             : 
     367             : 
     368             : /**
     369             :  * We are finished with the current transaction, try
     370             :  * to commit and then schedule the next iteration.
     371             :  *
     372             :  * @param wa wire account to commit for
     373             :  */
     374             : static void
     375          95 : do_commit (struct WireAccount *wa)
     376             : {
     377             :   enum GNUNET_DB_QueryStatus qs;
     378             : 
     379          95 :   if (wa->shard_end <= wa->latest_row_off)
     380             :   {
     381             :     /* shard is complete, mark this as well */
     382          47 :     qs = db_plugin->complete_shard (db_plugin->cls,
     383          47 :                                     wa->job_name,
     384             :                                     wa->shard_start,
     385             :                                     wa->shard_end);
     386          47 :     switch (qs)
     387             :     {
     388           0 :     case GNUNET_DB_STATUS_HARD_ERROR:
     389           0 :       GNUNET_break (0);
     390           0 :       db_plugin->rollback (db_plugin->cls);
     391           0 :       GNUNET_SCHEDULER_shutdown ();
     392           0 :       return;
     393           0 :     case GNUNET_DB_STATUS_SOFT_ERROR:
     394           0 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     395             :                   "Got DB soft error for complete_shard. Rolling back.\n");
     396           0 :       handle_soft_error (wa);
     397           0 :       return;
     398           0 :     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     399             :       /* already existed, ok, let's just continue */
     400           0 :       break;
     401          47 :     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     402             :       /* normal case */
     403          47 :       shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
     404             : 
     405          47 :       break;
     406             :     }
     407          48 :   }
     408          95 :   qs = db_plugin->commit (db_plugin->cls);
     409          95 :   switch (qs)
     410             :   {
     411           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     412           0 :     GNUNET_break (0);
     413           0 :     GNUNET_SCHEDULER_shutdown ();
     414           0 :     return;
     415           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     416             :     /* reduce transaction size to reduce rollback probability */
     417           0 :     handle_soft_error (wa);
     418           0 :     return;
     419          95 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     420             :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     421             :     /* normal case */
     422          95 :     break;
     423             :   }
     424             :   /* transaction success, update #last_row_off */
     425          95 :   wa->batch_start = wa->latest_row_off;
     426          95 :   if (wa->batch_size < MAXIMUM_BATCH_SIZE)
     427             :   {
     428             :     int delta;
     429             : 
     430          74 :     delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
     431          74 :     if (delta < 0)
     432          15 :       delta = -delta;
     433          74 :     wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
     434             :                                  wa->batch_size + delta + 1);
     435          74 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     436             :                 "Increasing batch size to %llu\n",
     437             :                 (unsigned long long) wa->batch_size);
     438             :   }
     439          95 :   if ( (wa->delay) &&
     440          27 :        (test_mode) &&
     441          27 :        (NULL == wa->next) )
     442             :   {
     443          27 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     444             :                 "Shutdown due to test mode!\n");
     445          27 :     GNUNET_SCHEDULER_shutdown ();
     446          27 :     return;
     447             :   }
     448          68 :   if (wa->delay)
     449             :   {
     450             :     wa->delayed_until
     451          21 :       = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
     452          21 :     wa_pos = wa_pos->next;
     453          21 :     if (NULL == wa_pos)
     454          21 :       wa_pos = wa_head;
     455          21 :     GNUNET_assert (NULL != wa_pos);
     456             :   }
     457          68 :   GNUNET_assert (NULL == task);
     458          68 :   task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
     459             :                                   &find_transfers,
     460             :                                   NULL);
     461             : }
     462             : 
     463             : 
     464             : /**
     465             :  * Callbacks of this type are used to serve the result of asking
     466             :  * the bank for the transaction history.
     467             :  *
     468             :  * @param cls closure with the `struct WioreAccount *` we are processing
     469             :  * @param http_status HTTP status code from the server
     470             :  * @param ec taler error code
     471             :  * @param serial_id identification of the position at which we are querying
     472             :  * @param details details about the wire transfer
     473             :  * @param json raw JSON response
     474             :  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
     475             :  */
     476             : static int
     477         121 : history_cb (void *cls,
     478             :             unsigned int http_status,
     479             :             enum TALER_ErrorCode ec,
     480             :             uint64_t serial_id,
     481             :             const struct TALER_BANK_CreditDetails *details,
     482             :             const json_t *json)
     483             : {
     484         121 :   struct WireAccount *wa = cls;
     485             :   enum GNUNET_DB_QueryStatus qs;
     486             : 
     487             :   (void) json;
     488         121 :   if (NULL == details)
     489             :   {
     490          74 :     wa->hh = NULL;
     491          74 :     if (TALER_EC_NONE != ec)
     492             :     {
     493          18 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     494             :                   "Error fetching history: ec=%u, http_status=%u\n",
     495             :                   (unsigned int) ec,
     496             :                   http_status);
     497             :     }
     498          74 :     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     499             :                 "End of list. Committing progress!\n");
     500          74 :     do_commit (wa);
     501          74 :     return GNUNET_OK; /* will be ignored anyway */
     502             :   }
     503          47 :   if (serial_id < wa->latest_row_off)
     504             :   {
     505             :     /* we are done with the current shard, commit and stop this iteration! */
     506           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     507             :                 "Serial ID %llu not monotonic (got %llu before). Failing!\n",
     508             :                 (unsigned long long) serial_id,
     509             :                 (unsigned long long) wa->latest_row_off);
     510           0 :     db_plugin->rollback (db_plugin->cls);
     511           0 :     GNUNET_SCHEDULER_shutdown ();
     512           0 :     wa->hh = NULL;
     513           0 :     return GNUNET_SYSERR;
     514             :   }
     515          47 :   if (serial_id > wa->shard_end)
     516             :   {
     517             :     /* we are done with the current shard, commit and stop this iteration! */
     518          21 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     519             :                 "Serial ID %llu past shard end at %llu, ending iteration early!\n",
     520             :                 (unsigned long long) serial_id,
     521             :                 (unsigned long long) wa->shard_end);
     522          21 :     wa->latest_row_off = serial_id - 1;
     523          21 :     wa->delay = false;
     524          21 :     do_commit (wa);
     525          21 :     wa->hh = NULL;
     526          21 :     return GNUNET_SYSERR;
     527             :   }
     528          26 :   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     529             :               "Adding wire transfer over %s with (hashed) subject `%s'\n",
     530             :               TALER_amount2s (&details->amount),
     531             :               TALER_B2S (&details->reserve_pub));
     532             :   /* FIXME-PERFORMANCE: Consider using Postgres multi-valued insert here,
     533             :      for up to 15x speed-up according to
     534             :      https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006
     535             :      (Note: this may require changing both the
     536             :      plugin API as well as modifying how this function is called.) */
     537          26 :   qs = db_plugin->reserves_in_insert (db_plugin->cls,
     538             :                                       &details->reserve_pub,
     539             :                                       &details->amount,
     540             :                                       details->execution_date,
     541             :                                       details->debit_account_uri,
     542          26 :                                       wa->ai->section_name,
     543             :                                       serial_id);
     544          26 :   switch (qs)
     545             :   {
     546           0 :   case GNUNET_DB_STATUS_HARD_ERROR:
     547           0 :     GNUNET_break (0);
     548           0 :     db_plugin->rollback (db_plugin->cls);
     549           0 :     GNUNET_SCHEDULER_shutdown ();
     550           0 :     wa->hh = NULL;
     551           0 :     return GNUNET_SYSERR;
     552           0 :   case GNUNET_DB_STATUS_SOFT_ERROR:
     553           0 :     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     554             :                 "Got DB soft error for reserves_in_insert. Rolling back.\n");
     555           0 :     handle_soft_error (wa);
     556           0 :     wa->hh = NULL;
     557           0 :     return GNUNET_SYSERR;
     558           0 :   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     559             :     /* already existed, ok, let's just continue */
     560           0 :     break;
     561          26 :   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     562             :     /* normal case */
     563          26 :     break;
     564             :   }
     565          26 :   wa->delay = false;
     566          26 :   wa->latest_row_off = serial_id;
     567          26 :   return GNUNET_OK;
     568             : }
     569             : 
     570             : 
     571             : /**
     572             :  * Query for incoming wire transfers.
     573             :  *
     574             :  * @param cls NULL
     575             :  */
     576             : static void
     577          95 : find_transfers (void *cls)
     578             : {
     579             :   enum GNUNET_DB_QueryStatus qs;
     580             :   unsigned int limit;
     581             : 
     582             :   (void) cls;
     583          95 :   task = NULL;
     584          95 :   if (GNUNET_SYSERR ==
     585          95 :       db_plugin->preflight (db_plugin->cls))
     586             :   {
     587           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     588             :                 "Failed to obtain database connection!\n");
     589           0 :     global_ret = EXIT_FAILURE;
     590           0 :     GNUNET_SCHEDULER_shutdown ();
     591           0 :     return;
     592             :   }
     593          95 :   wa_pos->delay = true;
     594          95 :   wa_pos->current_batch_size = 0; /* reset counter */
     595          95 :   if (wa_pos->shard_end <= wa_pos->batch_start)
     596             :   {
     597             :     uint64_t start;
     598             :     uint64_t end;
     599             :     struct GNUNET_TIME_Relative delay;
     600             :     /* advance to next shard */
     601             : 
     602          76 :     if (0 == max_workers)
     603          74 :       delay = GNUNET_TIME_UNIT_ZERO;
     604             :     else
     605           2 :       delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
     606             :         GNUNET_CRYPTO_QUALITY_WEAK,
     607           2 :         4 * GNUNET_TIME_relative_max (
     608             :           wirewatch_idle_sleep_interval,
     609             :           GNUNET_TIME_relative_multiply (shard_delay,
     610           2 :                                          max_workers)).rel_value_us);
     611          76 :     qs = db_plugin->begin_shard (db_plugin->cls,
     612          76 :                                  wa_pos->job_name,
     613             :                                  delay,
     614             :                                  shard_size,
     615             :                                  &start,
     616             :                                  &end);
     617          76 :     switch (qs)
     618             :     {
     619           0 :     case GNUNET_DB_STATUS_HARD_ERROR:
     620           0 :       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     621             :                   "Failed to obtain starting point for montoring from database!\n");
     622           0 :       global_ret = EXIT_FAILURE;
     623           0 :       GNUNET_SCHEDULER_shutdown ();
     624           0 :       return;
     625           0 :     case GNUNET_DB_STATUS_SOFT_ERROR:
     626             :       /* try again */
     627           0 :       task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
     628             :                                            &find_transfers,
     629             :                                            NULL);
     630           0 :       return;
     631           0 :     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     632           0 :       GNUNET_break (0);
     633           0 :       task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
     634             :                                            &find_transfers,
     635             :                                            NULL);
     636           0 :       return;
     637          76 :     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     638          76 :       wa_pos->shard_start_time = GNUNET_TIME_absolute_get ();
     639          76 :       wa_pos->shard_start = start;
     640          76 :       wa_pos->shard_end = end;
     641          76 :       wa_pos->batch_start = start;
     642          76 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     643             :                   "Starting with shard at %llu\n",
     644             :                   (unsigned long long) start);
     645          76 :       break;
     646             :     }
     647          76 :   }
     648          95 :   if (GNUNET_OK !=
     649          95 :       db_plugin->start_read_committed (db_plugin->cls,
     650             :                                        "wirewatch check for incoming wire transfers"))
     651             :   {
     652           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     653             :                 "Failed to start database transaction!\n");
     654           0 :     global_ret = EXIT_FAILURE;
     655           0 :     GNUNET_SCHEDULER_shutdown ();
     656           0 :     return;
     657             :   }
     658             : 
     659          95 :   limit = GNUNET_MIN (wa_pos->batch_size,
     660             :                       wa_pos->shard_end - wa_pos->batch_start);
     661          95 :   GNUNET_assert (NULL == wa_pos->hh);
     662          95 :   wa_pos->latest_row_off = wa_pos->batch_start;
     663         285 :   wa_pos->hh = TALER_BANK_credit_history (ctx,
     664          95 :                                           wa_pos->ai->auth,
     665          95 :                                           wa_pos->batch_start,
     666             :                                           limit,
     667             :                                           test_mode
     668          95 :                                           ? GNUNET_TIME_UNIT_ZERO
     669          21 :                                           : LONGPOLL_TIMEOUT,
     670             :                                           &history_cb,
     671             :                                           wa_pos);
     672          95 :   if (NULL == wa_pos->hh)
     673             :   {
     674           0 :     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
     675             :                 "Failed to start request for account history!\n");
     676           0 :     db_plugin->rollback (db_plugin->cls);
     677           0 :     global_ret = EXIT_FAILURE;
     678           0 :     GNUNET_SCHEDULER_shutdown ();
     679           0 :     return;
     680             :   }
     681             : }
     682             : 
     683             : 
     684             : /**
     685             :  * First task.
     686             :  *
     687             :  * @param cls closure, NULL
     688             :  * @param args remaining command-line arguments
     689             :  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
     690             :  * @param c configuration
     691             :  */
     692             : static void
     693          29 : run (void *cls,
     694             :      char *const *args,
     695             :      const char *cfgfile,
     696             :      const struct GNUNET_CONFIGURATION_Handle *c)
     697             : {
     698             :   (void) cls;
     699             :   (void) args;
     700             :   (void) cfgfile;
     701             : 
     702          29 :   cfg = c;
     703          29 :   if (GNUNET_OK !=
     704          29 :       exchange_serve_process_config ())
     705             :   {
     706           0 :     global_ret = EXIT_NOTCONFIGURED;
     707           0 :     return;
     708             :   }
     709          29 :   wa_pos = wa_head;
     710          29 :   GNUNET_assert (NULL != wa_pos);
     711          29 :   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
     712             :                                  cls);
     713          29 :   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
     714             :                           &rc);
     715          29 :   rc = GNUNET_CURL_gnunet_rc_create (ctx);
     716          29 :   if (NULL == ctx)
     717             :   {
     718           0 :     GNUNET_break (0);
     719           0 :     return;
     720             :   }
     721             : 
     722          29 :   task = GNUNET_SCHEDULER_add_now (&find_transfers,
     723             :                                    NULL);
     724             : }
     725             : 
     726             : 
     727             : /**
     728             :  * The main function of taler-exchange-wirewatch
     729             :  *
     730             :  * @param argc number of arguments from the command line
     731             :  * @param argv command line arguments
     732             :  * @return 0 ok, non-zero on error
     733             :  */
     734             : int
     735          29 : main (int argc,
     736             :       char *const *argv)
     737             : {
     738          29 :   struct GNUNET_GETOPT_CommandLineOption options[] = {
     739          29 :     GNUNET_GETOPT_option_uint ('S',
     740             :                                "size",
     741             :                                "SIZE",
     742             :                                "Size to process per shard (default: 1024)",
     743             :                                &shard_size),
     744          29 :     GNUNET_GETOPT_option_timetravel ('T',
     745             :                                      "timetravel"),
     746          29 :     GNUNET_GETOPT_option_flag ('t',
     747             :                                "test",
     748             :                                "run in test mode and exit when idle",
     749             :                                &test_mode),
     750          29 :     GNUNET_GETOPT_option_uint ('w',
     751             :                                "workers",
     752             :                                "COUNT",
     753             :                                "Plan work load with up to COUNT worker processes (default: 16)",
     754             :                                &max_workers),
     755             :     GNUNET_GETOPT_OPTION_END
     756             :   };
     757             :   enum GNUNET_GenericReturnValue ret;
     758             : 
     759          29 :   if (GNUNET_OK !=
     760          29 :       GNUNET_STRINGS_get_utf8_args (argc, argv,
     761             :                                     &argc, &argv))
     762           0 :     return EXIT_INVALIDARGUMENT;
     763          29 :   TALER_OS_init ();
     764          29 :   ret = GNUNET_PROGRAM_run (
     765             :     argc, argv,
     766             :     "taler-exchange-wirewatch",
     767             :     gettext_noop (
     768             :       "background process that watches for incoming wire transfers from customers"),
     769             :     options,
     770             :     &run, NULL);
     771          29 :   GNUNET_free_nz ((void *) argv);
     772          29 :   if (GNUNET_SYSERR == ret)
     773           0 :     return EXIT_INVALIDARGUMENT;
     774          29 :   if (GNUNET_NO == ret)
     775           0 :     return EXIT_SUCCESS;
     776          29 :   return global_ret;
     777             : }
     778             : 
     779             : 
     780             : /* end of taler-exchange-wirewatch.c */

Generated by: LCOV version 1.14