Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2022 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 :
13 : You should have received a copy of the GNU General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file exchangedb/begin_shard.c
18 : * @brief Implementation of the begin_shard function for Postgres
19 : * @author Christian Grothoff
20 : */
21 : #include "taler/taler_pq_lib.h"
22 : #include "exchange-database/begin_shard.h"
23 : #include "helper.h"
24 : #include "exchange-database/start.h"
25 : #include "exchange-database/rollback.h"
26 : #include "exchange-database/commit.h"
27 :
28 :
29 : enum GNUNET_DB_QueryStatus
30 281 : TALER_EXCHANGEDB_begin_shard (struct TALER_EXCHANGEDB_PostgresContext *pg,
31 : const char *job_name,
32 : struct GNUNET_TIME_Relative delay,
33 : uint64_t shard_size,
34 : uint64_t *start_row,
35 : uint64_t *end_row)
36 : {
37 :
38 281 : for (unsigned int retries = 0; retries<10; retries++)
39 : {
40 281 : if (GNUNET_OK !=
41 281 : TALER_EXCHANGEDB_start (pg,
42 : "begin_shard"))
43 : {
44 0 : GNUNET_break (0);
45 0 : return GNUNET_DB_STATUS_HARD_ERROR;
46 : }
47 :
48 : {
49 : struct GNUNET_TIME_Absolute past;
50 : enum GNUNET_DB_QueryStatus qs;
51 281 : struct GNUNET_PQ_QueryParam params[] = {
52 281 : GNUNET_PQ_query_param_string (job_name),
53 281 : GNUNET_PQ_query_param_absolute_time (&past),
54 : GNUNET_PQ_query_param_end
55 : };
56 281 : struct GNUNET_PQ_ResultSpec rs[] = {
57 281 : GNUNET_PQ_result_spec_uint64 ("start_row",
58 : start_row),
59 281 : GNUNET_PQ_result_spec_uint64 ("end_row",
60 : end_row),
61 : GNUNET_PQ_result_spec_end
62 : };
63 :
64 281 : past = GNUNET_TIME_absolute_get ();
65 281 : PREPARE (pg,
66 : "get_open_shard",
67 : "SELECT"
68 : " start_row"
69 : ",end_row"
70 : " FROM work_shards"
71 : " WHERE job_name=$1"
72 : " AND completed=FALSE"
73 : " AND last_attempt<$2"
74 : " ORDER BY last_attempt ASC"
75 : " LIMIT 1;");
76 281 : qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
77 : "get_open_shard",
78 : params,
79 : rs);
80 281 : switch (qs)
81 : {
82 0 : case GNUNET_DB_STATUS_HARD_ERROR:
83 0 : GNUNET_break (0);
84 0 : TALER_EXCHANGEDB_rollback (pg);
85 0 : return qs;
86 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
87 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
88 : "Serialization error on getting open shard\n");
89 0 : TALER_EXCHANGEDB_rollback (pg);
90 0 : continue;
91 90 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
92 : {
93 : enum GNUNET_DB_QueryStatus qsz;
94 : struct GNUNET_TIME_Absolute now;
95 90 : struct GNUNET_PQ_QueryParam iparams[] = {
96 90 : GNUNET_PQ_query_param_string (job_name),
97 90 : GNUNET_PQ_query_param_absolute_time (&now),
98 90 : GNUNET_PQ_query_param_uint64 (start_row),
99 90 : GNUNET_PQ_query_param_uint64 (end_row),
100 : GNUNET_PQ_query_param_end
101 : };
102 :
103 90 : now = GNUNET_TIME_relative_to_absolute (delay);
104 90 : PREPARE (pg,
105 : "reclaim_shard",
106 : "UPDATE work_shards"
107 : " SET last_attempt=$2"
108 : " WHERE job_name=$1"
109 : " AND start_row=$3"
110 : " AND end_row=$4");
111 90 : qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
112 : "reclaim_shard",
113 : iparams);
114 90 : switch (qsz)
115 : {
116 0 : case GNUNET_DB_STATUS_HARD_ERROR:
117 0 : GNUNET_break (0);
118 0 : TALER_EXCHANGEDB_rollback (pg);
119 0 : return qsz;
120 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
121 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
122 : "Serialization error on claiming open shard\n");
123 0 : TALER_EXCHANGEDB_rollback (pg);
124 0 : continue;
125 90 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
126 90 : goto commit;
127 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
128 0 : GNUNET_break (0); /* logic error, should be impossible */
129 0 : TALER_EXCHANGEDB_rollback (pg);
130 0 : return GNUNET_DB_STATUS_HARD_ERROR;
131 : }
132 : }
133 0 : break; /* actually unreachable */
134 191 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
135 191 : break; /* continued below */
136 : }
137 : } /* get_open_shard */
138 :
139 : /* No open shard, find last 'end_row' */
140 : {
141 : enum GNUNET_DB_QueryStatus qs;
142 191 : struct GNUNET_PQ_QueryParam params[] = {
143 191 : GNUNET_PQ_query_param_string (job_name),
144 : GNUNET_PQ_query_param_end
145 : };
146 191 : struct GNUNET_PQ_ResultSpec rs[] = {
147 191 : GNUNET_PQ_result_spec_uint64 ("end_row",
148 : start_row),
149 : GNUNET_PQ_result_spec_end
150 : };
151 :
152 191 : PREPARE (pg,
153 : "get_last_shard",
154 : "SELECT"
155 : " end_row"
156 : " FROM work_shards"
157 : " WHERE job_name=$1"
158 : " ORDER BY end_row DESC"
159 : " LIMIT 1;");
160 191 : qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
161 : "get_last_shard",
162 : params,
163 : rs);
164 191 : switch (qs)
165 : {
166 0 : case GNUNET_DB_STATUS_HARD_ERROR:
167 0 : GNUNET_break (0);
168 0 : TALER_EXCHANGEDB_rollback (pg);
169 0 : return qs;
170 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
171 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
172 : "Serialization error on getting last shard\n");
173 0 : TALER_EXCHANGEDB_rollback (pg);
174 0 : continue;
175 170 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
176 170 : break;
177 21 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
178 21 : *start_row = 0; /* base-case: no shards yet */
179 21 : break; /* continued below */
180 : }
181 191 : *end_row = *start_row + shard_size;
182 : } /* get_last_shard */
183 :
184 : /* Claim fresh shard */
185 : {
186 : enum GNUNET_DB_QueryStatus qs;
187 : struct GNUNET_TIME_Absolute now;
188 191 : struct GNUNET_PQ_QueryParam params[] = {
189 191 : GNUNET_PQ_query_param_string (job_name),
190 191 : GNUNET_PQ_query_param_absolute_time (&now),
191 191 : GNUNET_PQ_query_param_uint64 (start_row),
192 191 : GNUNET_PQ_query_param_uint64 (end_row),
193 : GNUNET_PQ_query_param_end
194 : };
195 :
196 191 : now = GNUNET_TIME_relative_to_absolute (delay);
197 191 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
198 : "Trying to claim shard (%llu-%llu]\n",
199 : (unsigned long long) *start_row,
200 : (unsigned long long) *end_row);
201 :
202 191 : PREPARE (pg,
203 : "claim_next_shard",
204 : "INSERT INTO work_shards"
205 : "(job_name"
206 : ",last_attempt"
207 : ",start_row"
208 : ",end_row"
209 : ") VALUES "
210 : "($1, $2, $3, $4);");
211 191 : qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
212 : "claim_next_shard",
213 : params);
214 191 : switch (qs)
215 : {
216 0 : case GNUNET_DB_STATUS_HARD_ERROR:
217 0 : GNUNET_break (0);
218 0 : TALER_EXCHANGEDB_rollback (pg);
219 0 : return qs;
220 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
221 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
222 : "Serialization error on claiming next shard\n");
223 0 : TALER_EXCHANGEDB_rollback (pg);
224 0 : continue;
225 191 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
226 : /* continued below */
227 191 : break;
228 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
229 : /* someone else got this shard already,
230 : try again */
231 0 : TALER_EXCHANGEDB_rollback (pg);
232 0 : continue;
233 : }
234 : } /* claim_next_shard */
235 :
236 : /* commit */
237 281 : commit:
238 : {
239 : enum GNUNET_DB_QueryStatus qs;
240 :
241 281 : qs = TALER_EXCHANGEDB_commit (pg);
242 281 : switch (qs)
243 : {
244 0 : case GNUNET_DB_STATUS_HARD_ERROR:
245 0 : GNUNET_break (0);
246 0 : TALER_EXCHANGEDB_rollback (pg);
247 0 : return qs;
248 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
249 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
250 : "Serialization error on commit for beginning shard\n");
251 0 : TALER_EXCHANGEDB_rollback (pg);
252 0 : continue;
253 281 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
254 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
255 281 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
256 : "Claimed new shard\n");
257 281 : return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
258 : }
259 : }
260 : } /* retry 'for' loop */
261 0 : return GNUNET_DB_STATUS_SOFT_ERROR;
262 : }
|