LCOV - code coverage report
Current view: top level - exchangedb - reserves_in_insert.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 61.9 % 139 86
Test Date: 2026-04-14 15:39:31 Functions: 100.0 % 3 3

            Line data    Source code
       1              : /*
       2              :    This file is part of TALER
       3              :    Copyright (C) 2022-2024 Taler Systems SA
       4              : 
       5              :    TALER is free software; you can redistribute it and/or modify it under the
       6              :    terms of the GNU General Public License as published by the Free Software
       7              :    Foundation; either version 3, or (at your option) any later version.
       8              : 
       9              :    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10              :    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11              :    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
      12              : 
      13              :    You should have received a copy of the GNU General Public License along with
      14              :    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15              :  */
      16              : /**
      17              :  * @file exchangedb/reserves_in_insert.c
      18              :  * @brief Implementation of the reserves_in_insert function for Postgres
      19              :  * @author Christian Grothoff
      20              :  * @author Joseph Xu
      21              :  */
      22              : #include "taler/taler_pq_lib.h"
      23              : #include "exchange-database/reserves_in_insert.h"
      24              : #include "helper.h"
      25              : #include "exchange-database/start.h"
      26              : #include "exchange-database/start_read_committed.h"
      27              : #include "exchange-database/commit.h"
      28              : #include "exchange-database/preflight.h"
      29              : #include "exchange-database/rollback.h"
      30              : 
      31              : 
      32              : /**
      33              :  * Generate event notification for the reserve change.
      34              :  *
      35              :  * @param reserve_pub reserve to notfiy on
      36              :  * @return string to pass to postgres for the notification
      37              :  */
      38              : static char *
      39           54 : compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
      40              : {
      41           54 :   struct TALER_EXCHANGEDB_ReserveEventP rep = {
      42           54 :     .header.size = htons (sizeof (rep)),
      43           54 :     .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
      44              :     .reserve_pub = *reserve_pub
      45              :   };
      46              : 
      47           54 :   return GNUNET_PQ_get_event_notify_channel (&rep.header);
      48              : }
      49              : 
      50              : 
      51              : /**
      52              :  * Closure for our helper_cb()
      53              :  */
      54              : struct Context
      55              : {
      56              :   /**
      57              :    * Array of reserve UUIDs to initialize.
      58              :    */
      59              :   uint64_t *reserve_uuids;
      60              : 
      61              :   /**
      62              :    * Array with entries set to 'true' for duplicate transactions.
      63              :    */
      64              :   bool *transaction_duplicates;
      65              : 
      66              :   /**
      67              :    * Array with entries set to 'true' for rows with conflicts.
      68              :    */
      69              :   bool *conflicts;
      70              : 
      71              :   /**
      72              :    * Set to #GNUNET_SYSERR on failures.
      73              :    */
      74              :   enum GNUNET_GenericReturnValue status;
      75              : 
      76              :   /**
      77              :    * Single value (no array) set to true if we need
      78              :    * to follow-up with an update.
      79              :    */
      80              :   bool needs_update;
      81              : };
      82              : 
      83              : 
      84              : /**
      85              :  * Helper function to be called with the results of a SELECT statement
      86              :  * that has returned @a num_results results.
      87              :  *
      88              :  * @param cls closure of type `struct Context *`
      89              :  * @param result the postgres result
      90              :  * @param num_results the number of results in @a result
      91              :  */
      92              : static void
      93           54 : helper_cb (void *cls,
      94              :            PGresult *result,
      95              :            unsigned int num_results)
      96              : {
      97           54 :   struct Context *ctx = cls;
      98              : 
      99          108 :   for (unsigned int i = 0; i<num_results; i++)
     100              :   {
     101           54 :     struct GNUNET_PQ_ResultSpec rs[] = {
     102           54 :       GNUNET_PQ_result_spec_bool (
     103              :         "transaction_duplicate",
     104           54 :         &ctx->transaction_duplicates[i]),
     105           54 :       GNUNET_PQ_result_spec_allow_null (
     106              :         GNUNET_PQ_result_spec_uint64 ("ruuid",
     107           54 :                                       &ctx->reserve_uuids[i]),
     108           54 :         &ctx->conflicts[i]),
     109              :       GNUNET_PQ_result_spec_end
     110              :     };
     111              : 
     112           54 :     if (GNUNET_OK !=
     113           54 :         GNUNET_PQ_extract_result (result,
     114              :                                   rs,
     115              :                                   i))
     116              :     {
     117            0 :       GNUNET_break (0);
     118            0 :       ctx->status = GNUNET_SYSERR;
     119            0 :       return;
     120              :     }
     121           54 :     if (! ctx->transaction_duplicates[i])
     122           54 :       ctx->needs_update |= ctx->conflicts[i];
     123              :   }
     124              : }
     125              : 
     126              : 
     127              : enum GNUNET_DB_QueryStatus
     128           54 : TALER_EXCHANGEDB_reserves_in_insert (
     129              :   struct TALER_EXCHANGEDB_PostgresContext *pg,
     130              :   const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
     131              :   unsigned int reserves_length,
     132              :   enum GNUNET_DB_QueryStatus *results)
     133           54 : {
     134           54 :   unsigned int dups = 0;
     135              : 
     136           54 :   struct TALER_FullPaytoHashP h_full_paytos[
     137           54 :     GNUNET_NZL (reserves_length)];
     138           54 :   struct TALER_NormalizedPaytoHashP h_normalized_paytos[
     139           54 :     GNUNET_NZL (reserves_length)];
     140           54 :   char *notify_s[GNUNET_NZL (reserves_length)];
     141           54 :   struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)];
     142           54 :   struct TALER_Amount balances[GNUNET_NZL (reserves_length)];
     143           54 :   struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)];
     144           54 :   const char *sender_account_details[GNUNET_NZL (reserves_length)];
     145           54 :   const char *exchange_account_names[GNUNET_NZL (reserves_length)];
     146           54 :   uint64_t wire_references[GNUNET_NZL (reserves_length)];
     147           54 :   uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
     148           54 :   bool transaction_duplicates[GNUNET_NZL (reserves_length)];
     149           54 :   bool conflicts[GNUNET_NZL (reserves_length)];
     150              :   struct GNUNET_TIME_Timestamp reserve_expiration
     151           54 :     = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
     152              :   struct GNUNET_TIME_Timestamp gc
     153           54 :     = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
     154              :   enum GNUNET_DB_QueryStatus qs;
     155              :   bool need_update;
     156              : 
     157          108 :   for (unsigned int i = 0; i<reserves_length; i++)
     158              :   {
     159           54 :     const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
     160              : 
     161           54 :     TALER_full_payto_hash (reserve->sender_account_details,
     162              :                            &h_full_paytos[i]);
     163           54 :     TALER_full_payto_normalize_and_hash (reserve->sender_account_details,
     164              :                                          &h_normalized_paytos[i]);
     165           54 :     notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
     166           54 :     reserve_pubs[i] = *reserve->reserve_pub;
     167           54 :     balances[i] = *reserve->balance;
     168           54 :     execution_times[i] = reserve->execution_time;
     169           54 :     sender_account_details[i] = reserve->sender_account_details.full_payto;
     170           54 :     exchange_account_names[i] = reserve->exchange_account_name;
     171           54 :     wire_references[i] = reserve->wire_reference;
     172              :   }
     173              : 
     174              :   /* NOTE: kind-of pointless to explicitly start a transaction here... */
     175           54 :   if (GNUNET_OK !=
     176           54 :       TALER_EXCHANGEDB_preflight (pg))
     177              :   {
     178            0 :     GNUNET_break (0);
     179            0 :     qs = GNUNET_DB_STATUS_HARD_ERROR;
     180            0 :     goto finished;
     181              :   }
     182           54 :   if (GNUNET_OK !=
     183           54 :       TALER_TALER_EXCHANGEDB_start_read_committed (pg,
     184              :                                                    "READ_COMMITED"))
     185              :   {
     186            0 :     GNUNET_break (0);
     187            0 :     qs = GNUNET_DB_STATUS_HARD_ERROR;
     188            0 :     goto finished;
     189              :   }
     190           54 :   PREPARE (pg,
     191              :            "reserves_insert_with_array",
     192              :            "SELECT"
     193              :            " transaction_duplicate"
     194              :            ",ruuid"
     195              :            " FROM exchange_do_array_reserves_insert"
     196              :            " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);");
     197              :   {
     198           54 :     struct GNUNET_PQ_QueryParam params[] = {
     199           54 :       GNUNET_PQ_query_param_timestamp (&gc),
     200           54 :       GNUNET_PQ_query_param_timestamp (&reserve_expiration),
     201           54 :       GNUNET_PQ_query_param_array_auto_from_type (reserves_length,
     202              :                                                   reserve_pubs,
     203              :                                                   pg->conn),
     204           54 :       GNUNET_PQ_query_param_array_uint64 (reserves_length,
     205              :                                           wire_references,
     206              :                                           pg->conn),
     207           54 :       TALER_PQ_query_param_array_amount (
     208              :         reserves_length,
     209              :         balances,
     210              :         pg->conn),
     211           54 :       GNUNET_PQ_query_param_array_ptrs_string (
     212              :         reserves_length,
     213              :         (const char **) exchange_account_names,
     214              :         pg->conn),
     215           54 :       GNUNET_PQ_query_param_array_timestamp (
     216              :         reserves_length,
     217              :         execution_times,
     218              :         pg->conn),
     219           54 :       GNUNET_PQ_query_param_array_auto_from_type (
     220              :         reserves_length,
     221              :         h_full_paytos,
     222              :         pg->conn),
     223           54 :       GNUNET_PQ_query_param_array_auto_from_type (
     224              :         reserves_length,
     225              :         h_normalized_paytos,
     226              :         pg->conn),
     227           54 :       GNUNET_PQ_query_param_array_ptrs_string (
     228              :         reserves_length,
     229              :         (const char **) sender_account_details,
     230              :         pg->conn),
     231           54 :       GNUNET_PQ_query_param_array_ptrs_string (
     232              :         reserves_length,
     233              :         (const char **) notify_s,
     234              :         pg->conn),
     235              :       GNUNET_PQ_query_param_end
     236              :     };
     237           54 :     struct Context ctx = {
     238              :       .reserve_uuids = reserve_uuids,
     239              :       .transaction_duplicates = transaction_duplicates,
     240              :       .conflicts = conflicts,
     241              :       .needs_update = false,
     242              :       .status = GNUNET_OK
     243              :     };
     244              : 
     245           54 :     qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
     246              :                                                "reserves_insert_with_array",
     247              :                                                params,
     248              :                                                &helper_cb,
     249              :                                                &ctx);
     250           54 :     GNUNET_PQ_cleanup_query_params_closures (params);
     251           54 :     if ( (qs < 0) ||
     252           54 :          (GNUNET_OK != ctx.status) )
     253              :     {
     254            0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     255              :                   "Failed to insert into reserves (%d)\n",
     256              :                   qs);
     257            0 :       goto finished;
     258              :     }
     259           54 :     need_update = ctx.needs_update;
     260              :   }
     261              : 
     262              :   {
     263              :     enum GNUNET_DB_QueryStatus cs;
     264              : 
     265           54 :     cs = TALER_EXCHANGEDB_commit (pg);
     266           54 :     if (cs < 0)
     267              :     {
     268            0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     269              :                   "Failed to commit\n");
     270            0 :       qs = cs;
     271            0 :       goto finished;
     272              :     }
     273              :   }
     274              : 
     275          108 :   for (unsigned int i = 0; i<reserves_length; i++)
     276              :   {
     277           54 :     if (transaction_duplicates[i])
     278            0 :       dups++;
     279           54 :     results[i] = transaction_duplicates[i]
     280              :       ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
     281           54 :       : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     282              :   }
     283              : 
     284           54 :   if (! need_update)
     285              :   {
     286           54 :     qs = reserves_length;
     287           54 :     goto finished;
     288              :   }
     289            0 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     290              :               "Reserve update needed for some reserves in the batch\n");
     291            0 :   PREPARE (pg,
     292              :            "reserves_update",
     293              :            "SELECT"
     294              :            " out_duplicate AS duplicate "
     295              :            "FROM exchange_do_batch_reserves_update"
     296              :            " ($1,$2,$3,$4,$5,$6,$7);");
     297              : 
     298            0 :   if (GNUNET_OK !=
     299            0 :       TALER_EXCHANGEDB_start (pg,
     300              :                               "reserve-insert-continued"))
     301              :   {
     302            0 :     GNUNET_break (0);
     303            0 :     qs = GNUNET_DB_STATUS_HARD_ERROR;
     304            0 :     goto finished;
     305              :   }
     306              : 
     307            0 :   for (unsigned int i = 0; i<reserves_length; i++)
     308              :   {
     309            0 :     if (transaction_duplicates[i])
     310            0 :       continue;
     311            0 :     if (! conflicts[i])
     312            0 :       continue;
     313              :     {
     314              :       bool duplicate;
     315            0 :       struct GNUNET_PQ_QueryParam params[] = {
     316            0 :         GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]),
     317            0 :         GNUNET_PQ_query_param_timestamp (&reserve_expiration),
     318            0 :         GNUNET_PQ_query_param_uint64 (&wire_references[i]),
     319            0 :         TALER_PQ_query_param_amount (pg->conn,
     320            0 :                                      &balances[i]),
     321            0 :         GNUNET_PQ_query_param_string (exchange_account_names[i]),
     322            0 :         GNUNET_PQ_query_param_auto_from_type (&h_full_paytos[i]),
     323            0 :         GNUNET_PQ_query_param_string (notify_s[i]),
     324              :         GNUNET_PQ_query_param_end
     325              :       };
     326            0 :       struct GNUNET_PQ_ResultSpec rs[] = {
     327            0 :         GNUNET_PQ_result_spec_bool ("duplicate",
     328              :                                     &duplicate),
     329              :         GNUNET_PQ_result_spec_end
     330              :       };
     331              :       enum GNUNET_DB_QueryStatus qsi;
     332              : 
     333            0 :       qsi = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     334              :                                                       "reserves_update",
     335              :                                                       params,
     336              :                                                       rs);
     337            0 :       if (qsi < 0)
     338              :       {
     339            0 :         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     340              :                     "Failed to update reserves (%d)\n",
     341              :                     qsi);
     342            0 :         results[i] = qsi;
     343            0 :         goto finished;
     344              :       }
     345            0 :       results[i] = duplicate
     346              :           ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
     347            0 :           : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     348              :     }
     349              :   }
     350              :   {
     351              :     enum GNUNET_DB_QueryStatus cs;
     352              : 
     353            0 :     cs = TALER_EXCHANGEDB_commit (pg);
     354            0 :     if (cs < 0)
     355              :     {
     356            0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     357              :                   "Failed to commit\n");
     358            0 :       qs = cs;
     359            0 :       goto finished;
     360              :     }
     361              :   }
     362            0 : finished:
     363          108 :   for (unsigned int i = 0; i<reserves_length; i++)
     364           54 :     GNUNET_free (notify_s[i]);
     365           54 :   if (qs < 0)
     366            0 :     return qs;
     367           54 :   GNUNET_PQ_event_do_poll (pg->conn);
     368           54 :   if (0 != dups)
     369            0 :     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     370              :                 "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n",
     371              :                 dups,
     372              :                 reserves_length);
     373           54 :   return qs;
     374              : }
        

Generated by: LCOV version 2.0-1