LCOV - code coverage report
Current view: top level - exchangedb - pg_reserves_in_insert.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 87 140 62.1 %
Date: 2025-06-05 21:03:14 Functions: 3 3 100.0 %

          Line data    Source code
       1             : /*
       2             :    This file is part of TALER
       3             :    Copyright (C) 2022-2024 Taler Systems SA
       4             : 
       5             :    TALER is free software; you can redistribute it and/or modify it under the
       6             :    terms of the GNU General Public License as published by the Free Software
       7             :    Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
      12             : 
      13             :    You should have received a copy of the GNU General Public License along with
      14             :    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             :  */
      16             : /**
      17             :  * @file exchangedb/pg_reserves_in_insert.c
      18             :  * @brief Implementation of the reserves_in_insert function for Postgres
      19             :  * @author Christian Grothoff
      20             :  * @author Joseph Xu
      21             :  */
      22             : #include "platform.h"
      23             : #include "taler_error_codes.h"
      24             : #include "taler_dbevents.h"
      25             : #include "taler_pq_lib.h"
      26             : #include "pg_reserves_in_insert.h"
      27             : #include "pg_helper.h"
      28             : #include "pg_start.h"
      29             : #include "pg_start_read_committed.h"
      30             : #include "pg_commit.h"
      31             : #include "pg_preflight.h"
      32             : #include "pg_rollback.h"
      33             : #include "pg_event_notify.h"
      34             : 
      35             : 
      36             : /**
      37             :  * Generate event notification for the reserve change.
      38             :  *
      39             :  * @param reserve_pub reserve to notfiy on
      40             :  * @return string to pass to postgres for the notification
      41             :  */
      42             : static char *
      43          58 : compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
      44             : {
      45          58 :   struct TALER_ReserveEventP rep = {
      46          58 :     .header.size = htons (sizeof (rep)),
      47          58 :     .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
      48             :     .reserve_pub = *reserve_pub
      49             :   };
      50             : 
      51          58 :   return GNUNET_PQ_get_event_notify_channel (&rep.header);
      52             : }
      53             : 
      54             : 
      55             : /**
      56             :  * Closure for our helper_cb()
      57             :  */
      58             : struct Context
      59             : {
      60             :   /**
      61             :    * Array of reserve UUIDs to initialize.
      62             :    */
      63             :   uint64_t *reserve_uuids;
      64             : 
      65             :   /**
      66             :    * Array with entries set to 'true' for duplicate transactions.
      67             :    */
      68             :   bool *transaction_duplicates;
      69             : 
      70             :   /**
      71             :    * Array with entries set to 'true' for rows with conflicts.
      72             :    */
      73             :   bool *conflicts;
      74             : 
      75             :   /**
      76             :    * Set to #GNUNET_SYSERR on failures.
      77             :    */
      78             :   enum GNUNET_GenericReturnValue status;
      79             : 
      80             :   /**
      81             :    * Single value (no array) set to true if we need
      82             :    * to follow-up with an update.
      83             :    */
      84             :   bool needs_update;
      85             : };
      86             : 
      87             : 
      88             : /**
      89             :  * Helper function to be called with the results of a SELECT statement
      90             :  * that has returned @a num_results results.
      91             :  *
      92             :  * @param cls closure of type `struct Context *`
      93             :  * @param result the postgres result
      94             :  * @param num_results the number of results in @a result
      95             :  */
      96             : static void
      97          58 : helper_cb (void *cls,
      98             :            PGresult *result,
      99             :            unsigned int num_results)
     100             : {
     101          58 :   struct Context *ctx = cls;
     102             : 
     103         116 :   for (unsigned int i = 0; i<num_results; i++)
     104             :   {
     105          58 :     struct GNUNET_PQ_ResultSpec rs[] = {
     106          58 :       GNUNET_PQ_result_spec_bool (
     107             :         "transaction_duplicate",
     108          58 :         &ctx->transaction_duplicates[i]),
     109          58 :       GNUNET_PQ_result_spec_allow_null (
     110             :         GNUNET_PQ_result_spec_uint64 ("ruuid",
     111          58 :                                       &ctx->reserve_uuids[i]),
     112          58 :         &ctx->conflicts[i]),
     113             :       GNUNET_PQ_result_spec_end
     114             :     };
     115             : 
     116          58 :     if (GNUNET_OK !=
     117          58 :         GNUNET_PQ_extract_result (result,
     118             :                                   rs,
     119             :                                   i))
     120             :     {
     121           0 :       GNUNET_break (0);
     122           0 :       ctx->status = GNUNET_SYSERR;
     123           0 :       return;
     124             :     }
     125          58 :     if (! ctx->transaction_duplicates[i])
     126          58 :       ctx->needs_update |= ctx->conflicts[i];
     127             :   }
     128             : }
     129             : 
     130             : 
     131             : enum GNUNET_DB_QueryStatus
     132          58 : TEH_PG_reserves_in_insert (
     133             :   void *cls,
     134             :   const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
     135             :   unsigned int reserves_length,
     136             :   enum GNUNET_DB_QueryStatus *results)
     137          58 : {
     138          58 :   struct PostgresClosure *pg = cls;
     139          58 :   unsigned int dups = 0;
     140             : 
     141          58 :   struct TALER_FullPaytoHashP h_full_paytos[
     142          58 :     GNUNET_NZL (reserves_length)];
     143          58 :   struct TALER_NormalizedPaytoHashP h_normalized_paytos[
     144          58 :     GNUNET_NZL (reserves_length)];
     145          58 :   char *notify_s[GNUNET_NZL (reserves_length)];
     146          58 :   struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)];
     147          58 :   struct TALER_Amount balances[GNUNET_NZL (reserves_length)];
     148          58 :   struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)];
     149          58 :   const char *sender_account_details[GNUNET_NZL (reserves_length)];
     150          58 :   const char *exchange_account_names[GNUNET_NZL (reserves_length)];
     151          58 :   uint64_t wire_references[GNUNET_NZL (reserves_length)];
     152          58 :   uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
     153          58 :   bool transaction_duplicates[GNUNET_NZL (reserves_length)];
     154          58 :   bool conflicts[GNUNET_NZL (reserves_length)];
     155             :   struct GNUNET_TIME_Timestamp reserve_expiration
     156          58 :     = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
     157             :   struct GNUNET_TIME_Timestamp gc
     158          58 :     = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
     159             :   enum GNUNET_DB_QueryStatus qs;
     160             :   bool need_update;
     161             : 
     162         116 :   for (unsigned int i = 0; i<reserves_length; i++)
     163             :   {
     164          58 :     const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
     165             : 
     166          58 :     TALER_full_payto_hash (reserve->sender_account_details,
     167             :                            &h_full_paytos[i]);
     168          58 :     TALER_full_payto_normalize_and_hash (reserve->sender_account_details,
     169             :                                          &h_normalized_paytos[i]);
     170          58 :     notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
     171          58 :     reserve_pubs[i] = *reserve->reserve_pub;
     172          58 :     balances[i] = *reserve->balance;
     173          58 :     execution_times[i] = reserve->execution_time;
     174          58 :     sender_account_details[i] = reserve->sender_account_details.full_payto;
     175          58 :     exchange_account_names[i] = reserve->exchange_account_name;
     176          58 :     wire_references[i] = reserve->wire_reference;
     177             :   }
     178             : 
     179             :   /* NOTE: kind-of pointless to explicitly start a transaction here... */
     180          58 :   if (GNUNET_OK !=
     181          58 :       TEH_PG_preflight (pg))
     182             :   {
     183           0 :     GNUNET_break (0);
     184           0 :     qs = GNUNET_DB_STATUS_HARD_ERROR;
     185           0 :     goto finished;
     186             :   }
     187          58 :   if (GNUNET_OK !=
     188          58 :       TEH_PG_start_read_committed (pg,
     189             :                                    "READ_COMMITED"))
     190             :   {
     191           0 :     GNUNET_break (0);
     192           0 :     qs = GNUNET_DB_STATUS_HARD_ERROR;
     193           0 :     goto finished;
     194             :   }
     195          58 :   PREPARE (pg,
     196             :            "reserves_insert_with_array",
     197             :            "SELECT"
     198             :            " transaction_duplicate"
     199             :            ",ruuid"
     200             :            " FROM exchange_do_array_reserves_insert"
     201             :            " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);");
     202             :   {
     203          58 :     struct GNUNET_PQ_QueryParam params[] = {
     204          58 :       GNUNET_PQ_query_param_timestamp (&gc),
     205          58 :       GNUNET_PQ_query_param_timestamp (&reserve_expiration),
     206          58 :       GNUNET_PQ_query_param_array_auto_from_type (reserves_length,
     207             :                                                   reserve_pubs,
     208             :                                                   pg->conn),
     209          58 :       GNUNET_PQ_query_param_array_uint64 (reserves_length,
     210             :                                           wire_references,
     211             :                                           pg->conn),
     212          58 :       TALER_PQ_query_param_array_amount (
     213             :         reserves_length,
     214             :         balances,
     215             :         pg->conn),
     216          58 :       GNUNET_PQ_query_param_array_ptrs_string (
     217             :         reserves_length,
     218             :         (const char **) exchange_account_names,
     219             :         pg->conn),
     220          58 :       GNUNET_PQ_query_param_array_timestamp (
     221             :         reserves_length,
     222             :         execution_times,
     223             :         pg->conn),
     224          58 :       GNUNET_PQ_query_param_array_auto_from_type (
     225             :         reserves_length,
     226             :         h_full_paytos,
     227             :         pg->conn),
     228          58 :       GNUNET_PQ_query_param_array_auto_from_type (
     229             :         reserves_length,
     230             :         h_normalized_paytos,
     231             :         pg->conn),
     232          58 :       GNUNET_PQ_query_param_array_ptrs_string (
     233             :         reserves_length,
     234             :         (const char **) sender_account_details,
     235             :         pg->conn),
     236          58 :       GNUNET_PQ_query_param_array_ptrs_string (
     237             :         reserves_length,
     238             :         (const char **) notify_s,
     239             :         pg->conn),
     240             :       GNUNET_PQ_query_param_end
     241             :     };
     242          58 :     struct Context ctx = {
     243             :       .reserve_uuids = reserve_uuids,
     244             :       .transaction_duplicates = transaction_duplicates,
     245             :       .conflicts = conflicts,
     246             :       .needs_update = false,
     247             :       .status = GNUNET_OK
     248             :     };
     249             : 
     250          58 :     qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
     251             :                                                "reserves_insert_with_array",
     252             :                                                params,
     253             :                                                &helper_cb,
     254             :                                                &ctx);
     255          58 :     GNUNET_PQ_cleanup_query_params_closures (params);
     256          58 :     if ( (qs < 0) ||
     257          58 :          (GNUNET_OK != ctx.status) )
     258             :     {
     259           0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     260             :                   "Failed to insert into reserves (%d)\n",
     261             :                   qs);
     262           0 :       goto finished;
     263             :     }
     264          58 :     need_update = ctx.needs_update;
     265             :   }
     266             : 
     267             :   {
     268             :     enum GNUNET_DB_QueryStatus cs;
     269             : 
     270          58 :     cs = TEH_PG_commit (pg);
     271          58 :     if (cs < 0)
     272             :     {
     273           0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     274             :                   "Failed to commit\n");
     275           0 :       qs = cs;
     276           0 :       goto finished;
     277             :     }
     278             :   }
     279             : 
     280         116 :   for (unsigned int i = 0; i<reserves_length; i++)
     281             :   {
     282          58 :     if (transaction_duplicates[i])
     283           0 :       dups++;
     284          58 :     results[i] = transaction_duplicates[i]
     285             :       ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
     286          58 :       : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     287             :   }
     288             : 
     289          58 :   if (! need_update)
     290             :   {
     291          58 :     qs = reserves_length;
     292          58 :     goto finished;
     293             :   }
     294           0 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     295             :               "Reserve update needed for some reserves in the batch\n");
     296           0 :   PREPARE (pg,
     297             :            "reserves_update",
     298             :            "SELECT"
     299             :            " out_duplicate AS duplicate "
     300             :            "FROM exchange_do_batch_reserves_update"
     301             :            " ($1,$2,$3,$4,$5,$6,$7);");
     302             : 
     303           0 :   if (GNUNET_OK !=
     304           0 :       TEH_PG_start (pg,
     305             :                     "reserve-insert-continued"))
     306             :   {
     307           0 :     GNUNET_break (0);
     308           0 :     qs = GNUNET_DB_STATUS_HARD_ERROR;
     309           0 :     goto finished;
     310             :   }
     311             : 
     312           0 :   for (unsigned int i = 0; i<reserves_length; i++)
     313             :   {
     314           0 :     if (transaction_duplicates[i])
     315           0 :       continue;
     316           0 :     if (! conflicts[i])
     317           0 :       continue;
     318             :     {
     319             :       bool duplicate;
     320           0 :       struct GNUNET_PQ_QueryParam params[] = {
     321           0 :         GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]),
     322           0 :         GNUNET_PQ_query_param_timestamp (&reserve_expiration),
     323           0 :         GNUNET_PQ_query_param_uint64 (&wire_references[i]),
     324           0 :         TALER_PQ_query_param_amount (pg->conn,
     325           0 :                                      &balances[i]),
     326           0 :         GNUNET_PQ_query_param_string (exchange_account_names[i]),
     327           0 :         GNUNET_PQ_query_param_auto_from_type (&h_full_paytos[i]),
     328           0 :         GNUNET_PQ_query_param_string (notify_s[i]),
     329             :         GNUNET_PQ_query_param_end
     330             :       };
     331           0 :       struct GNUNET_PQ_ResultSpec rs[] = {
     332           0 :         GNUNET_PQ_result_spec_bool ("duplicate",
     333             :                                     &duplicate),
     334             :         GNUNET_PQ_result_spec_end
     335             :       };
     336             :       enum GNUNET_DB_QueryStatus qsi;
     337             : 
     338           0 :       qsi = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     339             :                                                       "reserves_update",
     340             :                                                       params,
     341             :                                                       rs);
     342           0 :       if (qsi < 0)
     343             :       {
     344           0 :         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     345             :                     "Failed to update reserves (%d)\n",
     346             :                     qsi);
     347           0 :         results[i] = qsi;
     348           0 :         goto finished;
     349             :       }
     350           0 :       results[i] = duplicate
     351             :           ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
     352           0 :           : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     353             :     }
     354             :   }
     355             :   {
     356             :     enum GNUNET_DB_QueryStatus cs;
     357             : 
     358           0 :     cs = TEH_PG_commit (pg);
     359           0 :     if (cs < 0)
     360             :     {
     361           0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     362             :                   "Failed to commit\n");
     363           0 :       qs = cs;
     364           0 :       goto finished;
     365             :     }
     366             :   }
     367           0 : finished:
     368         116 :   for (unsigned int i = 0; i<reserves_length; i++)
     369          58 :     GNUNET_free (notify_s[i]);
     370          58 :   if (qs < 0)
     371           0 :     return qs;
     372          58 :   GNUNET_PQ_event_do_poll (pg->conn);
     373          58 :   if (0 != dups)
     374           0 :     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     375             :                 "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n",
     376             :                 dups,
     377             :                 reserves_length);
     378          58 :   return qs;
     379             : }

Generated by: LCOV version 1.16