Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2022-2024 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 :
13 : You should have received a copy of the GNU General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file exchangedb/reserves_in_insert.c
18 : * @brief Implementation of the reserves_in_insert function for Postgres
19 : * @author Christian Grothoff
20 : * @author Joseph Xu
21 : */
22 : #include "taler/taler_pq_lib.h"
23 : #include "exchange-database/reserves_in_insert.h"
24 : #include "helper.h"
25 : #include "exchange-database/start.h"
26 : #include "exchange-database/start_read_committed.h"
27 : #include "exchange-database/commit.h"
28 : #include "exchange-database/preflight.h"
29 : #include "exchange-database/rollback.h"
30 :
31 :
32 : /**
33 : * Generate event notification for the reserve change.
34 : *
35 : * @param reserve_pub reserve to notfiy on
36 : * @return string to pass to postgres for the notification
37 : */
38 : static char *
39 54 : compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
40 : {
41 54 : struct TALER_EXCHANGEDB_ReserveEventP rep = {
42 54 : .header.size = htons (sizeof (rep)),
43 54 : .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
44 : .reserve_pub = *reserve_pub
45 : };
46 :
47 54 : return GNUNET_PQ_get_event_notify_channel (&rep.header);
48 : }
49 :
50 :
51 : /**
52 : * Closure for our helper_cb()
53 : */
54 : struct Context
55 : {
56 : /**
57 : * Array of reserve UUIDs to initialize.
58 : */
59 : uint64_t *reserve_uuids;
60 :
61 : /**
62 : * Array with entries set to 'true' for duplicate transactions.
63 : */
64 : bool *transaction_duplicates;
65 :
66 : /**
67 : * Array with entries set to 'true' for rows with conflicts.
68 : */
69 : bool *conflicts;
70 :
71 : /**
72 : * Set to #GNUNET_SYSERR on failures.
73 : */
74 : enum GNUNET_GenericReturnValue status;
75 :
76 : /**
77 : * Single value (no array) set to true if we need
78 : * to follow-up with an update.
79 : */
80 : bool needs_update;
81 : };
82 :
83 :
84 : /**
85 : * Helper function to be called with the results of a SELECT statement
86 : * that has returned @a num_results results.
87 : *
88 : * @param cls closure of type `struct Context *`
89 : * @param result the postgres result
90 : * @param num_results the number of results in @a result
91 : */
92 : static void
93 54 : helper_cb (void *cls,
94 : PGresult *result,
95 : unsigned int num_results)
96 : {
97 54 : struct Context *ctx = cls;
98 :
99 108 : for (unsigned int i = 0; i<num_results; i++)
100 : {
101 54 : struct GNUNET_PQ_ResultSpec rs[] = {
102 54 : GNUNET_PQ_result_spec_bool (
103 : "transaction_duplicate",
104 54 : &ctx->transaction_duplicates[i]),
105 54 : GNUNET_PQ_result_spec_allow_null (
106 : GNUNET_PQ_result_spec_uint64 ("ruuid",
107 54 : &ctx->reserve_uuids[i]),
108 54 : &ctx->conflicts[i]),
109 : GNUNET_PQ_result_spec_end
110 : };
111 :
112 54 : if (GNUNET_OK !=
113 54 : GNUNET_PQ_extract_result (result,
114 : rs,
115 : i))
116 : {
117 0 : GNUNET_break (0);
118 0 : ctx->status = GNUNET_SYSERR;
119 0 : return;
120 : }
121 54 : if (! ctx->transaction_duplicates[i])
122 54 : ctx->needs_update |= ctx->conflicts[i];
123 : }
124 : }
125 :
126 :
127 : enum GNUNET_DB_QueryStatus
128 54 : TALER_EXCHANGEDB_reserves_in_insert (
129 : struct TALER_EXCHANGEDB_PostgresContext *pg,
130 : const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
131 : unsigned int reserves_length,
132 : enum GNUNET_DB_QueryStatus *results)
133 54 : {
134 54 : unsigned int dups = 0;
135 :
136 54 : struct TALER_FullPaytoHashP h_full_paytos[
137 54 : GNUNET_NZL (reserves_length)];
138 54 : struct TALER_NormalizedPaytoHashP h_normalized_paytos[
139 54 : GNUNET_NZL (reserves_length)];
140 54 : char *notify_s[GNUNET_NZL (reserves_length)];
141 54 : struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)];
142 54 : struct TALER_Amount balances[GNUNET_NZL (reserves_length)];
143 54 : struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)];
144 54 : const char *sender_account_details[GNUNET_NZL (reserves_length)];
145 54 : const char *exchange_account_names[GNUNET_NZL (reserves_length)];
146 54 : uint64_t wire_references[GNUNET_NZL (reserves_length)];
147 54 : uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
148 54 : bool transaction_duplicates[GNUNET_NZL (reserves_length)];
149 54 : bool conflicts[GNUNET_NZL (reserves_length)];
150 : struct GNUNET_TIME_Timestamp reserve_expiration
151 54 : = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
152 : struct GNUNET_TIME_Timestamp gc
153 54 : = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
154 : enum GNUNET_DB_QueryStatus qs;
155 : bool need_update;
156 :
157 108 : for (unsigned int i = 0; i<reserves_length; i++)
158 : {
159 54 : const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
160 :
161 54 : TALER_full_payto_hash (reserve->sender_account_details,
162 : &h_full_paytos[i]);
163 54 : TALER_full_payto_normalize_and_hash (reserve->sender_account_details,
164 : &h_normalized_paytos[i]);
165 54 : notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
166 54 : reserve_pubs[i] = *reserve->reserve_pub;
167 54 : balances[i] = *reserve->balance;
168 54 : execution_times[i] = reserve->execution_time;
169 54 : sender_account_details[i] = reserve->sender_account_details.full_payto;
170 54 : exchange_account_names[i] = reserve->exchange_account_name;
171 54 : wire_references[i] = reserve->wire_reference;
172 : }
173 :
174 : /* NOTE: kind-of pointless to explicitly start a transaction here... */
175 54 : if (GNUNET_OK !=
176 54 : TALER_EXCHANGEDB_preflight (pg))
177 : {
178 0 : GNUNET_break (0);
179 0 : qs = GNUNET_DB_STATUS_HARD_ERROR;
180 0 : goto finished;
181 : }
182 54 : if (GNUNET_OK !=
183 54 : TALER_TALER_EXCHANGEDB_start_read_committed (pg,
184 : "READ_COMMITED"))
185 : {
186 0 : GNUNET_break (0);
187 0 : qs = GNUNET_DB_STATUS_HARD_ERROR;
188 0 : goto finished;
189 : }
190 54 : PREPARE (pg,
191 : "reserves_insert_with_array",
192 : "SELECT"
193 : " transaction_duplicate"
194 : ",ruuid"
195 : " FROM exchange_do_array_reserves_insert"
196 : " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);");
197 : {
198 54 : struct GNUNET_PQ_QueryParam params[] = {
199 54 : GNUNET_PQ_query_param_timestamp (&gc),
200 54 : GNUNET_PQ_query_param_timestamp (&reserve_expiration),
201 54 : GNUNET_PQ_query_param_array_auto_from_type (reserves_length,
202 : reserve_pubs,
203 : pg->conn),
204 54 : GNUNET_PQ_query_param_array_uint64 (reserves_length,
205 : wire_references,
206 : pg->conn),
207 54 : TALER_PQ_query_param_array_amount (
208 : reserves_length,
209 : balances,
210 : pg->conn),
211 54 : GNUNET_PQ_query_param_array_ptrs_string (
212 : reserves_length,
213 : (const char **) exchange_account_names,
214 : pg->conn),
215 54 : GNUNET_PQ_query_param_array_timestamp (
216 : reserves_length,
217 : execution_times,
218 : pg->conn),
219 54 : GNUNET_PQ_query_param_array_auto_from_type (
220 : reserves_length,
221 : h_full_paytos,
222 : pg->conn),
223 54 : GNUNET_PQ_query_param_array_auto_from_type (
224 : reserves_length,
225 : h_normalized_paytos,
226 : pg->conn),
227 54 : GNUNET_PQ_query_param_array_ptrs_string (
228 : reserves_length,
229 : (const char **) sender_account_details,
230 : pg->conn),
231 54 : GNUNET_PQ_query_param_array_ptrs_string (
232 : reserves_length,
233 : (const char **) notify_s,
234 : pg->conn),
235 : GNUNET_PQ_query_param_end
236 : };
237 54 : struct Context ctx = {
238 : .reserve_uuids = reserve_uuids,
239 : .transaction_duplicates = transaction_duplicates,
240 : .conflicts = conflicts,
241 : .needs_update = false,
242 : .status = GNUNET_OK
243 : };
244 :
245 54 : qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
246 : "reserves_insert_with_array",
247 : params,
248 : &helper_cb,
249 : &ctx);
250 54 : GNUNET_PQ_cleanup_query_params_closures (params);
251 54 : if ( (qs < 0) ||
252 54 : (GNUNET_OK != ctx.status) )
253 : {
254 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
255 : "Failed to insert into reserves (%d)\n",
256 : qs);
257 0 : goto finished;
258 : }
259 54 : need_update = ctx.needs_update;
260 : }
261 :
262 : {
263 : enum GNUNET_DB_QueryStatus cs;
264 :
265 54 : cs = TALER_EXCHANGEDB_commit (pg);
266 54 : if (cs < 0)
267 : {
268 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
269 : "Failed to commit\n");
270 0 : qs = cs;
271 0 : goto finished;
272 : }
273 : }
274 :
275 108 : for (unsigned int i = 0; i<reserves_length; i++)
276 : {
277 54 : if (transaction_duplicates[i])
278 0 : dups++;
279 54 : results[i] = transaction_duplicates[i]
280 : ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
281 54 : : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
282 : }
283 :
284 54 : if (! need_update)
285 : {
286 54 : qs = reserves_length;
287 54 : goto finished;
288 : }
289 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
290 : "Reserve update needed for some reserves in the batch\n");
291 0 : PREPARE (pg,
292 : "reserves_update",
293 : "SELECT"
294 : " out_duplicate AS duplicate "
295 : "FROM exchange_do_batch_reserves_update"
296 : " ($1,$2,$3,$4,$5,$6,$7);");
297 :
298 0 : if (GNUNET_OK !=
299 0 : TALER_EXCHANGEDB_start (pg,
300 : "reserve-insert-continued"))
301 : {
302 0 : GNUNET_break (0);
303 0 : qs = GNUNET_DB_STATUS_HARD_ERROR;
304 0 : goto finished;
305 : }
306 :
307 0 : for (unsigned int i = 0; i<reserves_length; i++)
308 : {
309 0 : if (transaction_duplicates[i])
310 0 : continue;
311 0 : if (! conflicts[i])
312 0 : continue;
313 : {
314 : bool duplicate;
315 0 : struct GNUNET_PQ_QueryParam params[] = {
316 0 : GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]),
317 0 : GNUNET_PQ_query_param_timestamp (&reserve_expiration),
318 0 : GNUNET_PQ_query_param_uint64 (&wire_references[i]),
319 0 : TALER_PQ_query_param_amount (pg->conn,
320 0 : &balances[i]),
321 0 : GNUNET_PQ_query_param_string (exchange_account_names[i]),
322 0 : GNUNET_PQ_query_param_auto_from_type (&h_full_paytos[i]),
323 0 : GNUNET_PQ_query_param_string (notify_s[i]),
324 : GNUNET_PQ_query_param_end
325 : };
326 0 : struct GNUNET_PQ_ResultSpec rs[] = {
327 0 : GNUNET_PQ_result_spec_bool ("duplicate",
328 : &duplicate),
329 : GNUNET_PQ_result_spec_end
330 : };
331 : enum GNUNET_DB_QueryStatus qsi;
332 :
333 0 : qsi = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
334 : "reserves_update",
335 : params,
336 : rs);
337 0 : if (qsi < 0)
338 : {
339 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
340 : "Failed to update reserves (%d)\n",
341 : qsi);
342 0 : results[i] = qsi;
343 0 : goto finished;
344 : }
345 0 : results[i] = duplicate
346 : ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
347 0 : : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
348 : }
349 : }
350 : {
351 : enum GNUNET_DB_QueryStatus cs;
352 :
353 0 : cs = TALER_EXCHANGEDB_commit (pg);
354 0 : if (cs < 0)
355 : {
356 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
357 : "Failed to commit\n");
358 0 : qs = cs;
359 0 : goto finished;
360 : }
361 : }
362 0 : finished:
363 108 : for (unsigned int i = 0; i<reserves_length; i++)
364 54 : GNUNET_free (notify_s[i]);
365 54 : if (qs < 0)
366 0 : return qs;
367 54 : GNUNET_PQ_event_do_poll (pg->conn);
368 54 : if (0 != dups)
369 0 : GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
370 : "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n",
371 : dups,
372 : reserves_length);
373 54 : return qs;
374 : }
|