Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2022 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 :
13 : You should have received a copy of the GNU General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file exchangedb/begin_revolving_shard.c
18 : * @brief Implementation of the begin_revolving_shard function for Postgres
19 : * @author Christian Grothoff
20 : */
21 : #include "taler/taler_pq_lib.h"
22 : #include "exchange-database/begin_revolving_shard.h"
23 : #include "exchange-database/commit.h"
24 : #include "helper.h"
25 : #include "exchange-database/start.h"
26 : #include "exchange-database/rollback.h"
27 :
28 : enum GNUNET_DB_QueryStatus
29 97 : TALER_EXCHANGEDB_begin_revolving_shard (struct
30 : TALER_EXCHANGEDB_PostgresContext *pg,
31 : const char *job_name,
32 : uint32_t shard_size,
33 : uint32_t shard_limit,
34 : uint32_t *start_row,
35 : uint32_t *end_row)
36 : {
37 :
38 97 : GNUNET_assert (shard_limit <= 1U + (uint32_t) INT_MAX);
39 97 : GNUNET_assert (shard_limit > 0);
40 97 : GNUNET_assert (shard_size > 0);
41 97 : for (unsigned int retries = 0; retries<3; retries++)
42 : {
43 97 : if (GNUNET_OK !=
44 97 : TALER_EXCHANGEDB_start (pg,
45 : "begin_revolving_shard"))
46 : {
47 0 : GNUNET_break (0);
48 0 : return GNUNET_DB_STATUS_HARD_ERROR;
49 : }
50 :
51 : /* First, find last 'end_row' */
52 : {
53 : enum GNUNET_DB_QueryStatus qs;
54 : uint32_t last_end;
55 97 : struct GNUNET_PQ_QueryParam params[] = {
56 97 : GNUNET_PQ_query_param_string (job_name),
57 : GNUNET_PQ_query_param_end
58 : };
59 97 : struct GNUNET_PQ_ResultSpec rs[] = {
60 97 : GNUNET_PQ_result_spec_uint32 ("end_row",
61 : &last_end),
62 : GNUNET_PQ_result_spec_end
63 : };
64 : /* Used in #postgres_begin_revolving_shard() */
65 97 : PREPARE (pg,
66 : "get_last_revolving_shard",
67 : "SELECT"
68 : " end_row"
69 : " FROM revolving_work_shards"
70 : " WHERE job_name=$1"
71 : " ORDER BY end_row DESC"
72 : " LIMIT 1;");
73 97 : qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
74 : "get_last_revolving_shard",
75 : params,
76 : rs);
77 97 : switch (qs)
78 : {
79 0 : case GNUNET_DB_STATUS_HARD_ERROR:
80 0 : GNUNET_break (0);
81 0 : TALER_EXCHANGEDB_rollback (pg);
82 0 : return qs;
83 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
84 0 : TALER_EXCHANGEDB_rollback (pg);
85 0 : continue;
86 90 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
87 90 : *start_row = 1U + last_end;
88 90 : break;
89 7 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
90 7 : *start_row = 0; /* base-case: no shards yet */
91 7 : break; /* continued below */
92 : }
93 : } /* get_last_shard */
94 :
95 97 : if (*start_row < shard_limit)
96 : {
97 : /* Claim fresh shard */
98 : enum GNUNET_DB_QueryStatus qs;
99 : struct GNUNET_TIME_Absolute now;
100 7 : struct GNUNET_PQ_QueryParam params[] = {
101 7 : GNUNET_PQ_query_param_string (job_name),
102 7 : GNUNET_PQ_query_param_absolute_time (&now),
103 7 : GNUNET_PQ_query_param_uint32 (start_row),
104 7 : GNUNET_PQ_query_param_uint32 (end_row),
105 : GNUNET_PQ_query_param_end
106 : };
107 :
108 7 : *end_row = GNUNET_MIN (shard_limit,
109 : *start_row + shard_size - 1);
110 7 : now = GNUNET_TIME_absolute_get ();
111 7 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
112 : "Trying to claim shard %llu-%llu\n",
113 : (unsigned long long) *start_row,
114 : (unsigned long long) *end_row);
115 :
116 : /* Used in #postgres_claim_revolving_shard() */
117 7 : PREPARE (pg,
118 : "create_revolving_shard",
119 : "INSERT INTO revolving_work_shards"
120 : "(job_name"
121 : ",last_attempt"
122 : ",start_row"
123 : ",end_row"
124 : ",active"
125 : ") VALUES "
126 : "($1, $2, $3, $4, TRUE);");
127 7 : qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
128 : "create_revolving_shard",
129 : params);
130 7 : switch (qs)
131 : {
132 0 : case GNUNET_DB_STATUS_HARD_ERROR:
133 0 : GNUNET_break (0);
134 0 : TALER_EXCHANGEDB_rollback (pg);
135 0 : return qs;
136 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
137 0 : TALER_EXCHANGEDB_rollback (pg);
138 0 : continue;
139 7 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
140 : /* continued below (with commit) */
141 7 : break;
142 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
143 : /* someone else got this shard already,
144 : try again */
145 0 : TALER_EXCHANGEDB_rollback (pg);
146 0 : continue;
147 : }
148 : } /* end create fresh reovlving shard */
149 : else
150 : {
151 : /* claim oldest existing shard */
152 : enum GNUNET_DB_QueryStatus qs;
153 90 : struct GNUNET_PQ_QueryParam params[] = {
154 90 : GNUNET_PQ_query_param_string (job_name),
155 : GNUNET_PQ_query_param_end
156 : };
157 90 : struct GNUNET_PQ_ResultSpec rs[] = {
158 90 : GNUNET_PQ_result_spec_uint32 ("start_row",
159 : start_row),
160 90 : GNUNET_PQ_result_spec_uint32 ("end_row",
161 : end_row),
162 : GNUNET_PQ_result_spec_end
163 : };
164 :
165 90 : PREPARE (pg,
166 : "get_open_revolving_shard",
167 : "SELECT"
168 : " start_row"
169 : ",end_row"
170 : " FROM revolving_work_shards"
171 : " WHERE job_name=$1"
172 : " AND active=FALSE"
173 : " ORDER BY last_attempt ASC"
174 : " LIMIT 1;");
175 90 : qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
176 : "get_open_revolving_shard",
177 : params,
178 : rs);
179 90 : switch (qs)
180 : {
181 0 : case GNUNET_DB_STATUS_HARD_ERROR:
182 0 : GNUNET_break (0);
183 0 : TALER_EXCHANGEDB_rollback (pg);
184 0 : return qs;
185 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
186 0 : TALER_EXCHANGEDB_rollback (pg);
187 0 : continue;
188 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
189 : /* no open shards available */
190 0 : TALER_EXCHANGEDB_rollback (pg);
191 0 : return qs;
192 90 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
193 : {
194 : enum GNUNET_DB_QueryStatus qsz;
195 : struct GNUNET_TIME_Timestamp now;
196 90 : struct GNUNET_PQ_QueryParam iparams[] = {
197 90 : GNUNET_PQ_query_param_string (job_name),
198 90 : GNUNET_PQ_query_param_timestamp (&now),
199 90 : GNUNET_PQ_query_param_uint32 (start_row),
200 90 : GNUNET_PQ_query_param_uint32 (end_row),
201 : GNUNET_PQ_query_param_end
202 : };
203 :
204 90 : now = GNUNET_TIME_timestamp_get ();
205 90 : PREPARE (pg,
206 : "reclaim_revolving_shard",
207 : "UPDATE revolving_work_shards"
208 : " SET last_attempt=$2"
209 : " ,active=TRUE"
210 : " WHERE job_name=$1"
211 : " AND start_row=$3"
212 : " AND end_row=$4");
213 90 : qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
214 : "reclaim_revolving_shard",
215 : iparams);
216 90 : switch (qsz)
217 : {
218 0 : case GNUNET_DB_STATUS_HARD_ERROR:
219 0 : GNUNET_break (0);
220 0 : TALER_EXCHANGEDB_rollback (pg);
221 0 : return qs;
222 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
223 0 : TALER_EXCHANGEDB_rollback (pg);
224 0 : continue;
225 90 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
226 90 : break; /* continue with commit */
227 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
228 0 : GNUNET_break (0); /* logic error, should be impossible */
229 0 : TALER_EXCHANGEDB_rollback (pg);
230 0 : return GNUNET_DB_STATUS_HARD_ERROR;
231 : }
232 : }
233 90 : break; /* continue with commit */
234 : }
235 : } /* end claim oldest existing shard */
236 :
237 : /* commit */
238 : {
239 : enum GNUNET_DB_QueryStatus qs;
240 :
241 97 : qs = TALER_EXCHANGEDB_commit (pg);
242 97 : switch (qs)
243 : {
244 0 : case GNUNET_DB_STATUS_HARD_ERROR:
245 0 : GNUNET_break (0);
246 0 : TALER_EXCHANGEDB_rollback (pg);
247 0 : return qs;
248 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
249 0 : TALER_EXCHANGEDB_rollback (pg);
250 0 : continue;
251 97 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
252 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
253 97 : return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
254 : }
255 : }
256 : } /* retry 'for' loop */
257 0 : return GNUNET_DB_STATUS_SOFT_ERROR;
258 : }
|