Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2022-2024 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 :
13 : You should have received a copy of the GNU General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file exchangedb/pg_reserves_in_insert.c
18 : * @brief Implementation of the reserves_in_insert function for Postgres
19 : * @author Christian Grothoff
20 : * @author Joseph Xu
21 : */
22 : #include "platform.h"
23 : #include "taler_error_codes.h"
24 : #include "taler_dbevents.h"
25 : #include "taler_pq_lib.h"
26 : #include "pg_reserves_in_insert.h"
27 : #include "pg_helper.h"
28 : #include "pg_start.h"
29 : #include "pg_start_read_committed.h"
30 : #include "pg_commit.h"
31 : #include "pg_preflight.h"
32 : #include "pg_rollback.h"
33 : #include "pg_event_notify.h"
34 :
35 :
36 : /**
37 : * Generate event notification for the reserve change.
38 : *
39 : * @param reserve_pub reserve to notfiy on
40 : * @return string to pass to postgres for the notification
41 : */
42 : static char *
43 58 : compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
44 : {
45 58 : struct TALER_ReserveEventP rep = {
46 58 : .header.size = htons (sizeof (rep)),
47 58 : .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
48 : .reserve_pub = *reserve_pub
49 : };
50 :
51 58 : return GNUNET_PQ_get_event_notify_channel (&rep.header);
52 : }
53 :
54 :
55 : /**
56 : * Closure for our helper_cb()
57 : */
58 : struct Context
59 : {
60 : /**
61 : * Array of reserve UUIDs to initialize.
62 : */
63 : uint64_t *reserve_uuids;
64 :
65 : /**
66 : * Array with entries set to 'true' for duplicate transactions.
67 : */
68 : bool *transaction_duplicates;
69 :
70 : /**
71 : * Array with entries set to 'true' for rows with conflicts.
72 : */
73 : bool *conflicts;
74 :
75 : /**
76 : * Set to #GNUNET_SYSERR on failures.
77 : */
78 : enum GNUNET_GenericReturnValue status;
79 :
80 : /**
81 : * Single value (no array) set to true if we need
82 : * to follow-up with an update.
83 : */
84 : bool needs_update;
85 : };
86 :
87 :
88 : /**
89 : * Helper function to be called with the results of a SELECT statement
90 : * that has returned @a num_results results.
91 : *
92 : * @param cls closure of type `struct Context *`
93 : * @param result the postgres result
94 : * @param num_results the number of results in @a result
95 : */
96 : static void
97 58 : helper_cb (void *cls,
98 : PGresult *result,
99 : unsigned int num_results)
100 : {
101 58 : struct Context *ctx = cls;
102 :
103 116 : for (unsigned int i = 0; i<num_results; i++)
104 : {
105 58 : struct GNUNET_PQ_ResultSpec rs[] = {
106 58 : GNUNET_PQ_result_spec_bool (
107 : "transaction_duplicate",
108 58 : &ctx->transaction_duplicates[i]),
109 58 : GNUNET_PQ_result_spec_allow_null (
110 : GNUNET_PQ_result_spec_uint64 ("ruuid",
111 58 : &ctx->reserve_uuids[i]),
112 58 : &ctx->conflicts[i]),
113 : GNUNET_PQ_result_spec_end
114 : };
115 :
116 58 : if (GNUNET_OK !=
117 58 : GNUNET_PQ_extract_result (result,
118 : rs,
119 : i))
120 : {
121 0 : GNUNET_break (0);
122 0 : ctx->status = GNUNET_SYSERR;
123 0 : return;
124 : }
125 58 : if (! ctx->transaction_duplicates[i])
126 58 : ctx->needs_update |= ctx->conflicts[i];
127 : }
128 : }
129 :
130 :
131 : enum GNUNET_DB_QueryStatus
132 58 : TEH_PG_reserves_in_insert (
133 : void *cls,
134 : const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
135 : unsigned int reserves_length,
136 : enum GNUNET_DB_QueryStatus *results)
137 58 : {
138 58 : struct PostgresClosure *pg = cls;
139 58 : unsigned int dups = 0;
140 :
141 58 : struct TALER_FullPaytoHashP h_full_paytos[
142 58 : GNUNET_NZL (reserves_length)];
143 58 : struct TALER_NormalizedPaytoHashP h_normalized_paytos[
144 58 : GNUNET_NZL (reserves_length)];
145 58 : char *notify_s[GNUNET_NZL (reserves_length)];
146 58 : struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)];
147 58 : struct TALER_Amount balances[GNUNET_NZL (reserves_length)];
148 58 : struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)];
149 58 : const char *sender_account_details[GNUNET_NZL (reserves_length)];
150 58 : const char *exchange_account_names[GNUNET_NZL (reserves_length)];
151 58 : uint64_t wire_references[GNUNET_NZL (reserves_length)];
152 58 : uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
153 58 : bool transaction_duplicates[GNUNET_NZL (reserves_length)];
154 58 : bool conflicts[GNUNET_NZL (reserves_length)];
155 : struct GNUNET_TIME_Timestamp reserve_expiration
156 58 : = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
157 : struct GNUNET_TIME_Timestamp gc
158 58 : = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
159 : enum GNUNET_DB_QueryStatus qs;
160 : bool need_update;
161 :
162 116 : for (unsigned int i = 0; i<reserves_length; i++)
163 : {
164 58 : const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
165 :
166 58 : TALER_full_payto_hash (reserve->sender_account_details,
167 : &h_full_paytos[i]);
168 58 : TALER_full_payto_normalize_and_hash (reserve->sender_account_details,
169 : &h_normalized_paytos[i]);
170 58 : notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
171 58 : reserve_pubs[i] = *reserve->reserve_pub;
172 58 : balances[i] = *reserve->balance;
173 58 : execution_times[i] = reserve->execution_time;
174 58 : sender_account_details[i] = reserve->sender_account_details.full_payto;
175 58 : exchange_account_names[i] = reserve->exchange_account_name;
176 58 : wire_references[i] = reserve->wire_reference;
177 : }
178 :
179 : /* NOTE: kind-of pointless to explicitly start a transaction here... */
180 58 : if (GNUNET_OK !=
181 58 : TEH_PG_preflight (pg))
182 : {
183 0 : GNUNET_break (0);
184 0 : qs = GNUNET_DB_STATUS_HARD_ERROR;
185 0 : goto finished;
186 : }
187 58 : if (GNUNET_OK !=
188 58 : TEH_PG_start_read_committed (pg,
189 : "READ_COMMITED"))
190 : {
191 0 : GNUNET_break (0);
192 0 : qs = GNUNET_DB_STATUS_HARD_ERROR;
193 0 : goto finished;
194 : }
195 58 : PREPARE (pg,
196 : "reserves_insert_with_array",
197 : "SELECT"
198 : " transaction_duplicate"
199 : ",ruuid"
200 : " FROM exchange_do_array_reserves_insert"
201 : " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);");
202 : {
203 58 : struct GNUNET_PQ_QueryParam params[] = {
204 58 : GNUNET_PQ_query_param_timestamp (&gc),
205 58 : GNUNET_PQ_query_param_timestamp (&reserve_expiration),
206 58 : GNUNET_PQ_query_param_array_auto_from_type (reserves_length,
207 : reserve_pubs,
208 : pg->conn),
209 58 : GNUNET_PQ_query_param_array_uint64 (reserves_length,
210 : wire_references,
211 : pg->conn),
212 58 : TALER_PQ_query_param_array_amount (
213 : reserves_length,
214 : balances,
215 : pg->conn),
216 58 : GNUNET_PQ_query_param_array_ptrs_string (
217 : reserves_length,
218 : (const char **) exchange_account_names,
219 : pg->conn),
220 58 : GNUNET_PQ_query_param_array_timestamp (
221 : reserves_length,
222 : execution_times,
223 : pg->conn),
224 58 : GNUNET_PQ_query_param_array_auto_from_type (
225 : reserves_length,
226 : h_full_paytos,
227 : pg->conn),
228 58 : GNUNET_PQ_query_param_array_auto_from_type (
229 : reserves_length,
230 : h_normalized_paytos,
231 : pg->conn),
232 58 : GNUNET_PQ_query_param_array_ptrs_string (
233 : reserves_length,
234 : (const char **) sender_account_details,
235 : pg->conn),
236 58 : GNUNET_PQ_query_param_array_ptrs_string (
237 : reserves_length,
238 : (const char **) notify_s,
239 : pg->conn),
240 : GNUNET_PQ_query_param_end
241 : };
242 58 : struct Context ctx = {
243 : .reserve_uuids = reserve_uuids,
244 : .transaction_duplicates = transaction_duplicates,
245 : .conflicts = conflicts,
246 : .needs_update = false,
247 : .status = GNUNET_OK
248 : };
249 :
250 58 : qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
251 : "reserves_insert_with_array",
252 : params,
253 : &helper_cb,
254 : &ctx);
255 58 : GNUNET_PQ_cleanup_query_params_closures (params);
256 58 : if ( (qs < 0) ||
257 58 : (GNUNET_OK != ctx.status) )
258 : {
259 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
260 : "Failed to insert into reserves (%d)\n",
261 : qs);
262 0 : goto finished;
263 : }
264 58 : need_update = ctx.needs_update;
265 : }
266 :
267 : {
268 : enum GNUNET_DB_QueryStatus cs;
269 :
270 58 : cs = TEH_PG_commit (pg);
271 58 : if (cs < 0)
272 : {
273 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
274 : "Failed to commit\n");
275 0 : qs = cs;
276 0 : goto finished;
277 : }
278 : }
279 :
280 116 : for (unsigned int i = 0; i<reserves_length; i++)
281 : {
282 58 : if (transaction_duplicates[i])
283 0 : dups++;
284 58 : results[i] = transaction_duplicates[i]
285 : ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
286 58 : : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
287 : }
288 :
289 58 : if (! need_update)
290 : {
291 58 : qs = reserves_length;
292 58 : goto finished;
293 : }
294 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
295 : "Reserve update needed for some reserves in the batch\n");
296 0 : PREPARE (pg,
297 : "reserves_update",
298 : "SELECT"
299 : " out_duplicate AS duplicate "
300 : "FROM exchange_do_batch_reserves_update"
301 : " ($1,$2,$3,$4,$5,$6,$7);");
302 :
303 0 : if (GNUNET_OK !=
304 0 : TEH_PG_start (pg,
305 : "reserve-insert-continued"))
306 : {
307 0 : GNUNET_break (0);
308 0 : qs = GNUNET_DB_STATUS_HARD_ERROR;
309 0 : goto finished;
310 : }
311 :
312 0 : for (unsigned int i = 0; i<reserves_length; i++)
313 : {
314 0 : if (transaction_duplicates[i])
315 0 : continue;
316 0 : if (! conflicts[i])
317 0 : continue;
318 : {
319 : bool duplicate;
320 0 : struct GNUNET_PQ_QueryParam params[] = {
321 0 : GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]),
322 0 : GNUNET_PQ_query_param_timestamp (&reserve_expiration),
323 0 : GNUNET_PQ_query_param_uint64 (&wire_references[i]),
324 0 : TALER_PQ_query_param_amount (pg->conn,
325 0 : &balances[i]),
326 0 : GNUNET_PQ_query_param_string (exchange_account_names[i]),
327 0 : GNUNET_PQ_query_param_auto_from_type (&h_full_paytos[i]),
328 0 : GNUNET_PQ_query_param_string (notify_s[i]),
329 : GNUNET_PQ_query_param_end
330 : };
331 0 : struct GNUNET_PQ_ResultSpec rs[] = {
332 0 : GNUNET_PQ_result_spec_bool ("duplicate",
333 : &duplicate),
334 : GNUNET_PQ_result_spec_end
335 : };
336 : enum GNUNET_DB_QueryStatus qsi;
337 :
338 0 : qsi = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
339 : "reserves_update",
340 : params,
341 : rs);
342 0 : if (qsi < 0)
343 : {
344 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
345 : "Failed to update reserves (%d)\n",
346 : qsi);
347 0 : results[i] = qsi;
348 0 : goto finished;
349 : }
350 0 : results[i] = duplicate
351 : ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
352 0 : : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
353 : }
354 : }
355 : {
356 : enum GNUNET_DB_QueryStatus cs;
357 :
358 0 : cs = TEH_PG_commit (pg);
359 0 : if (cs < 0)
360 : {
361 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
362 : "Failed to commit\n");
363 0 : qs = cs;
364 0 : goto finished;
365 : }
366 : }
367 0 : finished:
368 116 : for (unsigned int i = 0; i<reserves_length; i++)
369 58 : GNUNET_free (notify_s[i]);
370 58 : if (qs < 0)
371 0 : return qs;
372 58 : GNUNET_PQ_event_do_poll (pg->conn);
373 58 : if (0 != dups)
374 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
375 : "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n",
376 : dups,
377 : reserves_length);
378 58 : return qs;
379 : }
|