Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2022 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 :
13 : You should have received a copy of the GNU General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file exchangedb/pg_begin_shard.c
18 : * @brief Implementation of the begin_shard function for Postgres
19 : * @author Christian Grothoff
20 : */
21 : #include "platform.h"
22 : #include "taler_error_codes.h"
23 : #include "taler_dbevents.h"
24 : #include "taler_pq_lib.h"
25 : #include "pg_begin_shard.h"
26 : #include "pg_helper.h"
27 : #include "pg_start.h"
28 : #include "pg_rollback.h"
29 : #include "pg_commit.h"
30 :
31 :
32 : enum GNUNET_DB_QueryStatus
33 296 : TEH_PG_begin_shard (void *cls,
34 : const char *job_name,
35 : struct GNUNET_TIME_Relative delay,
36 : uint64_t shard_size,
37 : uint64_t *start_row,
38 : uint64_t *end_row)
39 : {
40 296 : struct PostgresClosure *pg = cls;
41 :
42 296 : for (unsigned int retries = 0; retries<10; retries++)
43 : {
44 296 : if (GNUNET_OK !=
45 296 : TEH_PG_start (pg,
46 : "begin_shard"))
47 : {
48 0 : GNUNET_break (0);
49 0 : return GNUNET_DB_STATUS_HARD_ERROR;
50 : }
51 :
52 : {
53 : struct GNUNET_TIME_Absolute past;
54 : enum GNUNET_DB_QueryStatus qs;
55 296 : struct GNUNET_PQ_QueryParam params[] = {
56 296 : GNUNET_PQ_query_param_string (job_name),
57 296 : GNUNET_PQ_query_param_absolute_time (&past),
58 : GNUNET_PQ_query_param_end
59 : };
60 296 : struct GNUNET_PQ_ResultSpec rs[] = {
61 296 : GNUNET_PQ_result_spec_uint64 ("start_row",
62 : start_row),
63 296 : GNUNET_PQ_result_spec_uint64 ("end_row",
64 : end_row),
65 : GNUNET_PQ_result_spec_end
66 : };
67 :
68 296 : past = GNUNET_TIME_absolute_get ();
69 296 : PREPARE (pg,
70 : "get_open_shard",
71 : "SELECT"
72 : " start_row"
73 : ",end_row"
74 : " FROM work_shards"
75 : " WHERE job_name=$1"
76 : " AND completed=FALSE"
77 : " AND last_attempt<$2"
78 : " ORDER BY last_attempt ASC"
79 : " LIMIT 1;");
80 296 : qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
81 : "get_open_shard",
82 : params,
83 : rs);
84 296 : switch (qs)
85 : {
86 0 : case GNUNET_DB_STATUS_HARD_ERROR:
87 0 : GNUNET_break (0);
88 0 : TEH_PG_rollback (pg);
89 0 : return qs;
90 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
91 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
92 : "Serialization error on getting open shard\n");
93 0 : TEH_PG_rollback (pg);
94 0 : continue;
95 91 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
96 : {
97 : enum GNUNET_DB_QueryStatus qsz;
98 : struct GNUNET_TIME_Absolute now;
99 91 : struct GNUNET_PQ_QueryParam iparams[] = {
100 91 : GNUNET_PQ_query_param_string (job_name),
101 91 : GNUNET_PQ_query_param_absolute_time (&now),
102 91 : GNUNET_PQ_query_param_uint64 (start_row),
103 91 : GNUNET_PQ_query_param_uint64 (end_row),
104 : GNUNET_PQ_query_param_end
105 : };
106 :
107 91 : now = GNUNET_TIME_relative_to_absolute (delay);
108 91 : PREPARE (pg,
109 : "reclaim_shard",
110 : "UPDATE work_shards"
111 : " SET last_attempt=$2"
112 : " WHERE job_name=$1"
113 : " AND start_row=$3"
114 : " AND end_row=$4");
115 91 : qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
116 : "reclaim_shard",
117 : iparams);
118 0 : switch (qsz)
119 : {
120 0 : case GNUNET_DB_STATUS_HARD_ERROR:
121 0 : GNUNET_break (0);
122 0 : TEH_PG_rollback (pg);
123 0 : return qsz;
124 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
125 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
126 : "Serialization error on claiming open shard\n");
127 0 : TEH_PG_rollback (pg);
128 0 : continue;
129 91 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
130 91 : goto commit;
131 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
132 0 : GNUNET_break (0); /* logic error, should be impossible */
133 0 : TEH_PG_rollback (pg);
134 0 : return GNUNET_DB_STATUS_HARD_ERROR;
135 : }
136 : }
137 0 : break; /* actually unreachable */
138 205 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
139 205 : break; /* continued below */
140 : }
141 : } /* get_open_shard */
142 :
143 : /* No open shard, find last 'end_row' */
144 : {
145 : enum GNUNET_DB_QueryStatus qs;
146 205 : struct GNUNET_PQ_QueryParam params[] = {
147 205 : GNUNET_PQ_query_param_string (job_name),
148 : GNUNET_PQ_query_param_end
149 : };
150 205 : struct GNUNET_PQ_ResultSpec rs[] = {
151 205 : GNUNET_PQ_result_spec_uint64 ("end_row",
152 : start_row),
153 : GNUNET_PQ_result_spec_end
154 : };
155 :
156 205 : PREPARE (pg,
157 : "get_last_shard",
158 : "SELECT"
159 : " end_row"
160 : " FROM work_shards"
161 : " WHERE job_name=$1"
162 : " ORDER BY end_row DESC"
163 : " LIMIT 1;");
164 205 : qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
165 : "get_last_shard",
166 : params,
167 : rs);
168 205 : switch (qs)
169 : {
170 0 : case GNUNET_DB_STATUS_HARD_ERROR:
171 0 : GNUNET_break (0);
172 0 : TEH_PG_rollback (pg);
173 0 : return qs;
174 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
175 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
176 : "Serialization error on getting last shard\n");
177 0 : TEH_PG_rollback (pg);
178 0 : continue;
179 170 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
180 170 : break;
181 35 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
182 35 : *start_row = 0; /* base-case: no shards yet */
183 35 : break; /* continued below */
184 : }
185 205 : *end_row = *start_row + shard_size;
186 : } /* get_last_shard */
187 :
188 : /* Claim fresh shard */
189 : {
190 : enum GNUNET_DB_QueryStatus qs;
191 : struct GNUNET_TIME_Absolute now;
192 205 : struct GNUNET_PQ_QueryParam params[] = {
193 205 : GNUNET_PQ_query_param_string (job_name),
194 205 : GNUNET_PQ_query_param_absolute_time (&now),
195 205 : GNUNET_PQ_query_param_uint64 (start_row),
196 205 : GNUNET_PQ_query_param_uint64 (end_row),
197 : GNUNET_PQ_query_param_end
198 : };
199 :
200 205 : now = GNUNET_TIME_relative_to_absolute (delay);
201 205 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
202 : "Trying to claim shard (%llu-%llu]\n",
203 : (unsigned long long) *start_row,
204 : (unsigned long long) *end_row);
205 :
206 205 : PREPARE (pg,
207 : "claim_next_shard",
208 : "INSERT INTO work_shards"
209 : "(job_name"
210 : ",last_attempt"
211 : ",start_row"
212 : ",end_row"
213 : ") VALUES "
214 : "($1, $2, $3, $4);");
215 205 : qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
216 : "claim_next_shard",
217 : params);
218 205 : switch (qs)
219 : {
220 0 : case GNUNET_DB_STATUS_HARD_ERROR:
221 0 : GNUNET_break (0);
222 0 : TEH_PG_rollback (pg);
223 0 : return qs;
224 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
225 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
226 : "Serialization error on claiming next shard\n");
227 0 : TEH_PG_rollback (pg);
228 0 : continue;
229 205 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
230 : /* continued below */
231 205 : break;
232 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
233 : /* someone else got this shard already,
234 : try again */
235 0 : TEH_PG_rollback (pg);
236 0 : continue;
237 : }
238 : } /* claim_next_shard */
239 :
240 : /* commit */
241 296 : commit:
242 : {
243 : enum GNUNET_DB_QueryStatus qs;
244 :
245 296 : qs = TEH_PG_commit (pg);
246 296 : switch (qs)
247 : {
248 0 : case GNUNET_DB_STATUS_HARD_ERROR:
249 0 : GNUNET_break (0);
250 0 : TEH_PG_rollback (pg);
251 0 : return qs;
252 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
253 0 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
254 : "Serialization error on commit for beginning shard\n");
255 0 : TEH_PG_rollback (pg);
256 0 : continue;
257 296 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
258 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
259 296 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
260 : "Claimed new shard\n");
261 296 : return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
262 : }
263 : }
264 : } /* retry 'for' loop */
265 0 : return GNUNET_DB_STATUS_SOFT_ERROR;
266 : }
|