LCOV - code coverage report
Current view: top level - exchangedb - pg_reserves_in_insert.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 62.1 % 140 87
Test Date: 2025-12-28 14:06:02 Functions: 100.0 % 3 3

            Line data    Source code
       1              : /*
       2              :    This file is part of TALER
       3              :    Copyright (C) 2022-2024 Taler Systems SA
       4              : 
       5              :    TALER is free software; you can redistribute it and/or modify it under the
       6              :    terms of the GNU General Public License as published by the Free Software
       7              :    Foundation; either version 3, or (at your option) any later version.
       8              : 
       9              :    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10              :    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11              :    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
      12              : 
      13              :    You should have received a copy of the GNU General Public License along with
      14              :    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15              :  */
      16              : /**
      17              :  * @file exchangedb/pg_reserves_in_insert.c
      18              :  * @brief Implementation of the reserves_in_insert function for Postgres
      19              :  * @author Christian Grothoff
      20              :  * @author Joseph Xu
      21              :  */
      22              : #include "taler/platform.h"
      23              : #include "taler/taler_error_codes.h"
      24              : #include "taler/taler_dbevents.h"
      25              : #include "taler/taler_pq_lib.h"
      26              : #include "pg_reserves_in_insert.h"
      27              : #include "pg_helper.h"
      28              : #include "pg_start.h"
      29              : #include "pg_start_read_committed.h"
      30              : #include "pg_commit.h"
      31              : #include "pg_preflight.h"
      32              : #include "pg_rollback.h"
      33              : #include "pg_event_notify.h"
      34              : 
      35              : 
      36              : /**
      37              :  * Generate event notification for the reserve change.
      38              :  *
      39              :  * @param reserve_pub reserve to notfiy on
      40              :  * @return string to pass to postgres for the notification
      41              :  */
      42              : static char *
      43           54 : compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
      44              : {
      45           54 :   struct TALER_ReserveEventP rep = {
      46           54 :     .header.size = htons (sizeof (rep)),
      47           54 :     .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
      48              :     .reserve_pub = *reserve_pub
      49              :   };
      50              : 
      51           54 :   return GNUNET_PQ_get_event_notify_channel (&rep.header);
      52              : }
      53              : 
      54              : 
      55              : /**
      56              :  * Closure for our helper_cb()
      57              :  */
      58              : struct Context
      59              : {
      60              :   /**
      61              :    * Array of reserve UUIDs to initialize.
      62              :    */
      63              :   uint64_t *reserve_uuids;
      64              : 
      65              :   /**
      66              :    * Array with entries set to 'true' for duplicate transactions.
      67              :    */
      68              :   bool *transaction_duplicates;
      69              : 
      70              :   /**
      71              :    * Array with entries set to 'true' for rows with conflicts.
      72              :    */
      73              :   bool *conflicts;
      74              : 
      75              :   /**
      76              :    * Set to #GNUNET_SYSERR on failures.
      77              :    */
      78              :   enum GNUNET_GenericReturnValue status;
      79              : 
      80              :   /**
      81              :    * Single value (no array) set to true if we need
      82              :    * to follow-up with an update.
      83              :    */
      84              :   bool needs_update;
      85              : };
      86              : 
      87              : 
      88              : /**
      89              :  * Helper function to be called with the results of a SELECT statement
      90              :  * that has returned @a num_results results.
      91              :  *
      92              :  * @param cls closure of type `struct Context *`
      93              :  * @param result the postgres result
      94              :  * @param num_results the number of results in @a result
      95              :  */
      96              : static void
      97           54 : helper_cb (void *cls,
      98              :            PGresult *result,
      99              :            unsigned int num_results)
     100              : {
     101           54 :   struct Context *ctx = cls;
     102              : 
     103          108 :   for (unsigned int i = 0; i<num_results; i++)
     104              :   {
     105           54 :     struct GNUNET_PQ_ResultSpec rs[] = {
     106           54 :       GNUNET_PQ_result_spec_bool (
     107              :         "transaction_duplicate",
     108           54 :         &ctx->transaction_duplicates[i]),
     109           54 :       GNUNET_PQ_result_spec_allow_null (
     110              :         GNUNET_PQ_result_spec_uint64 ("ruuid",
     111           54 :                                       &ctx->reserve_uuids[i]),
     112           54 :         &ctx->conflicts[i]),
     113              :       GNUNET_PQ_result_spec_end
     114              :     };
     115              : 
     116           54 :     if (GNUNET_OK !=
     117           54 :         GNUNET_PQ_extract_result (result,
     118              :                                   rs,
     119              :                                   i))
     120              :     {
     121            0 :       GNUNET_break (0);
     122            0 :       ctx->status = GNUNET_SYSERR;
     123            0 :       return;
     124              :     }
     125           54 :     if (! ctx->transaction_duplicates[i])
     126           54 :       ctx->needs_update |= ctx->conflicts[i];
     127              :   }
     128              : }
     129              : 
     130              : 
     131              : enum GNUNET_DB_QueryStatus
     132           54 : TEH_PG_reserves_in_insert (
     133              :   void *cls,
     134              :   const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
     135              :   unsigned int reserves_length,
     136              :   enum GNUNET_DB_QueryStatus *results)
     137           54 : {
     138           54 :   struct PostgresClosure *pg = cls;
     139           54 :   unsigned int dups = 0;
     140              : 
     141           54 :   struct TALER_FullPaytoHashP h_full_paytos[
     142           54 :     GNUNET_NZL (reserves_length)];
     143           54 :   struct TALER_NormalizedPaytoHashP h_normalized_paytos[
     144           54 :     GNUNET_NZL (reserves_length)];
     145           54 :   char *notify_s[GNUNET_NZL (reserves_length)];
     146           54 :   struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)];
     147           54 :   struct TALER_Amount balances[GNUNET_NZL (reserves_length)];
     148           54 :   struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)];
     149           54 :   const char *sender_account_details[GNUNET_NZL (reserves_length)];
     150           54 :   const char *exchange_account_names[GNUNET_NZL (reserves_length)];
     151           54 :   uint64_t wire_references[GNUNET_NZL (reserves_length)];
     152           54 :   uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
     153           54 :   bool transaction_duplicates[GNUNET_NZL (reserves_length)];
     154           54 :   bool conflicts[GNUNET_NZL (reserves_length)];
     155              :   struct GNUNET_TIME_Timestamp reserve_expiration
     156           54 :     = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
     157              :   struct GNUNET_TIME_Timestamp gc
     158           54 :     = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
     159              :   enum GNUNET_DB_QueryStatus qs;
     160              :   bool need_update;
     161              : 
     162          108 :   for (unsigned int i = 0; i<reserves_length; i++)
     163              :   {
     164           54 :     const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
     165              : 
     166           54 :     TALER_full_payto_hash (reserve->sender_account_details,
     167              :                            &h_full_paytos[i]);
     168           54 :     TALER_full_payto_normalize_and_hash (reserve->sender_account_details,
     169              :                                          &h_normalized_paytos[i]);
     170           54 :     notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
     171           54 :     reserve_pubs[i] = *reserve->reserve_pub;
     172           54 :     balances[i] = *reserve->balance;
     173           54 :     execution_times[i] = reserve->execution_time;
     174           54 :     sender_account_details[i] = reserve->sender_account_details.full_payto;
     175           54 :     exchange_account_names[i] = reserve->exchange_account_name;
     176           54 :     wire_references[i] = reserve->wire_reference;
     177              :   }
     178              : 
     179              :   /* NOTE: kind-of pointless to explicitly start a transaction here... */
     180           54 :   if (GNUNET_OK !=
     181           54 :       TEH_PG_preflight (pg))
     182              :   {
     183            0 :     GNUNET_break (0);
     184            0 :     qs = GNUNET_DB_STATUS_HARD_ERROR;
     185            0 :     goto finished;
     186              :   }
     187           54 :   if (GNUNET_OK !=
     188           54 :       TEH_PG_start_read_committed (pg,
     189              :                                    "READ_COMMITED"))
     190              :   {
     191            0 :     GNUNET_break (0);
     192            0 :     qs = GNUNET_DB_STATUS_HARD_ERROR;
     193            0 :     goto finished;
     194              :   }
     195           54 :   PREPARE (pg,
     196              :            "reserves_insert_with_array",
     197              :            "SELECT"
     198              :            " transaction_duplicate"
     199              :            ",ruuid"
     200              :            " FROM exchange_do_array_reserves_insert"
     201              :            " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);");
     202              :   {
     203           54 :     struct GNUNET_PQ_QueryParam params[] = {
     204           54 :       GNUNET_PQ_query_param_timestamp (&gc),
     205           54 :       GNUNET_PQ_query_param_timestamp (&reserve_expiration),
     206           54 :       GNUNET_PQ_query_param_array_auto_from_type (reserves_length,
     207              :                                                   reserve_pubs,
     208              :                                                   pg->conn),
     209           54 :       GNUNET_PQ_query_param_array_uint64 (reserves_length,
     210              :                                           wire_references,
     211              :                                           pg->conn),
     212           54 :       TALER_PQ_query_param_array_amount (
     213              :         reserves_length,
     214              :         balances,
     215              :         pg->conn),
     216           54 :       GNUNET_PQ_query_param_array_ptrs_string (
     217              :         reserves_length,
     218              :         (const char **) exchange_account_names,
     219              :         pg->conn),
     220           54 :       GNUNET_PQ_query_param_array_timestamp (
     221              :         reserves_length,
     222              :         execution_times,
     223              :         pg->conn),
     224           54 :       GNUNET_PQ_query_param_array_auto_from_type (
     225              :         reserves_length,
     226              :         h_full_paytos,
     227              :         pg->conn),
     228           54 :       GNUNET_PQ_query_param_array_auto_from_type (
     229              :         reserves_length,
     230              :         h_normalized_paytos,
     231              :         pg->conn),
     232           54 :       GNUNET_PQ_query_param_array_ptrs_string (
     233              :         reserves_length,
     234              :         (const char **) sender_account_details,
     235              :         pg->conn),
     236           54 :       GNUNET_PQ_query_param_array_ptrs_string (
     237              :         reserves_length,
     238              :         (const char **) notify_s,
     239              :         pg->conn),
     240              :       GNUNET_PQ_query_param_end
     241              :     };
     242           54 :     struct Context ctx = {
     243              :       .reserve_uuids = reserve_uuids,
     244              :       .transaction_duplicates = transaction_duplicates,
     245              :       .conflicts = conflicts,
     246              :       .needs_update = false,
     247              :       .status = GNUNET_OK
     248              :     };
     249              : 
     250           54 :     qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
     251              :                                                "reserves_insert_with_array",
     252              :                                                params,
     253              :                                                &helper_cb,
     254              :                                                &ctx);
     255           54 :     GNUNET_PQ_cleanup_query_params_closures (params);
     256           54 :     if ( (qs < 0) ||
     257           54 :          (GNUNET_OK != ctx.status) )
     258              :     {
     259            0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     260              :                   "Failed to insert into reserves (%d)\n",
     261              :                   qs);
     262            0 :       goto finished;
     263              :     }
     264           54 :     need_update = ctx.needs_update;
     265              :   }
     266              : 
     267              :   {
     268              :     enum GNUNET_DB_QueryStatus cs;
     269              : 
     270           54 :     cs = TEH_PG_commit (pg);
     271           54 :     if (cs < 0)
     272              :     {
     273            0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     274              :                   "Failed to commit\n");
     275            0 :       qs = cs;
     276            0 :       goto finished;
     277              :     }
     278              :   }
     279              : 
     280          108 :   for (unsigned int i = 0; i<reserves_length; i++)
     281              :   {
     282           54 :     if (transaction_duplicates[i])
     283            0 :       dups++;
     284           54 :     results[i] = transaction_duplicates[i]
     285              :       ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
     286           54 :       : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     287              :   }
     288              : 
     289           54 :   if (! need_update)
     290              :   {
     291           54 :     qs = reserves_length;
     292           54 :     goto finished;
     293              :   }
     294            0 :   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     295              :               "Reserve update needed for some reserves in the batch\n");
     296            0 :   PREPARE (pg,
     297              :            "reserves_update",
     298              :            "SELECT"
     299              :            " out_duplicate AS duplicate "
     300              :            "FROM exchange_do_batch_reserves_update"
     301              :            " ($1,$2,$3,$4,$5,$6,$7);");
     302              : 
     303            0 :   if (GNUNET_OK !=
     304            0 :       TEH_PG_start (pg,
     305              :                     "reserve-insert-continued"))
     306              :   {
     307            0 :     GNUNET_break (0);
     308            0 :     qs = GNUNET_DB_STATUS_HARD_ERROR;
     309            0 :     goto finished;
     310              :   }
     311              : 
     312            0 :   for (unsigned int i = 0; i<reserves_length; i++)
     313              :   {
     314            0 :     if (transaction_duplicates[i])
     315            0 :       continue;
     316            0 :     if (! conflicts[i])
     317            0 :       continue;
     318              :     {
     319              :       bool duplicate;
     320            0 :       struct GNUNET_PQ_QueryParam params[] = {
     321            0 :         GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]),
     322            0 :         GNUNET_PQ_query_param_timestamp (&reserve_expiration),
     323            0 :         GNUNET_PQ_query_param_uint64 (&wire_references[i]),
     324            0 :         TALER_PQ_query_param_amount (pg->conn,
     325            0 :                                      &balances[i]),
     326            0 :         GNUNET_PQ_query_param_string (exchange_account_names[i]),
     327            0 :         GNUNET_PQ_query_param_auto_from_type (&h_full_paytos[i]),
     328            0 :         GNUNET_PQ_query_param_string (notify_s[i]),
     329              :         GNUNET_PQ_query_param_end
     330              :       };
     331            0 :       struct GNUNET_PQ_ResultSpec rs[] = {
     332            0 :         GNUNET_PQ_result_spec_bool ("duplicate",
     333              :                                     &duplicate),
     334              :         GNUNET_PQ_result_spec_end
     335              :       };
     336              :       enum GNUNET_DB_QueryStatus qsi;
     337              : 
     338            0 :       qsi = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     339              :                                                       "reserves_update",
     340              :                                                       params,
     341              :                                                       rs);
     342            0 :       if (qsi < 0)
     343              :       {
     344            0 :         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     345              :                     "Failed to update reserves (%d)\n",
     346              :                     qsi);
     347            0 :         results[i] = qsi;
     348            0 :         goto finished;
     349              :       }
     350            0 :       results[i] = duplicate
     351              :           ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
     352            0 :           : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     353              :     }
     354              :   }
     355              :   {
     356              :     enum GNUNET_DB_QueryStatus cs;
     357              : 
     358            0 :     cs = TEH_PG_commit (pg);
     359            0 :     if (cs < 0)
     360              :     {
     361            0 :       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     362              :                   "Failed to commit\n");
     363            0 :       qs = cs;
     364            0 :       goto finished;
     365              :     }
     366              :   }
     367            0 : finished:
     368          108 :   for (unsigned int i = 0; i<reserves_length; i++)
     369           54 :     GNUNET_free (notify_s[i]);
     370           54 :   if (qs < 0)
     371            0 :     return qs;
     372           54 :   GNUNET_PQ_event_do_poll (pg->conn);
     373           54 :   if (0 != dups)
     374            0 :     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
     375              :                 "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n",
     376              :                 dups,
     377              :                 reserves_length);
     378           54 :   return qs;
     379              : }
        

Generated by: LCOV version 2.0-1