Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2022 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 :
13 : You should have received a copy of the GNU General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file exchangedb/pg_begin_revolving_shard.c
18 : * @brief Implementation of the begin_revolving_shard function for Postgres
19 : * @author Christian Grothoff
20 : */
21 : #include "platform.h"
22 : #include "taler_error_codes.h"
23 : #include "taler_dbevents.h"
24 : #include "taler_pq_lib.h"
25 : #include "pg_begin_revolving_shard.h"
26 : #include "pg_commit.h"
27 : #include "pg_helper.h"
28 : #include "pg_start.h"
29 : #include "pg_rollback.h"
30 :
31 : enum GNUNET_DB_QueryStatus
32 121 : TEH_PG_begin_revolving_shard (void *cls,
33 : const char *job_name,
34 : uint32_t shard_size,
35 : uint32_t shard_limit,
36 : uint32_t *start_row,
37 : uint32_t *end_row)
38 : {
39 121 : struct PostgresClosure *pg = cls;
40 :
41 121 : GNUNET_assert (shard_limit <= 1U + (uint32_t) INT_MAX);
42 121 : GNUNET_assert (shard_limit > 0);
43 121 : GNUNET_assert (shard_size > 0);
44 121 : for (unsigned int retries = 0; retries<3; retries++)
45 : {
46 121 : if (GNUNET_OK !=
47 121 : TEH_PG_start (pg,
48 : "begin_revolving_shard"))
49 : {
50 0 : GNUNET_break (0);
51 0 : return GNUNET_DB_STATUS_HARD_ERROR;
52 : }
53 :
54 : /* First, find last 'end_row' */
55 : {
56 : enum GNUNET_DB_QueryStatus qs;
57 : uint32_t last_end;
58 121 : struct GNUNET_PQ_QueryParam params[] = {
59 121 : GNUNET_PQ_query_param_string (job_name),
60 : GNUNET_PQ_query_param_end
61 : };
62 121 : struct GNUNET_PQ_ResultSpec rs[] = {
63 121 : GNUNET_PQ_result_spec_uint32 ("end_row",
64 : &last_end),
65 : GNUNET_PQ_result_spec_end
66 : };
67 : /* Used in #postgres_begin_revolving_shard() */
68 121 : PREPARE (pg,
69 : "get_last_revolving_shard",
70 : "SELECT"
71 : " end_row"
72 : " FROM revolving_work_shards"
73 : " WHERE job_name=$1"
74 : " ORDER BY end_row DESC"
75 : " LIMIT 1;");
76 121 : qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
77 : "get_last_revolving_shard",
78 : params,
79 : rs);
80 121 : switch (qs)
81 : {
82 0 : case GNUNET_DB_STATUS_HARD_ERROR:
83 0 : GNUNET_break (0);
84 0 : TEH_PG_rollback (pg);
85 0 : return qs;
86 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
87 0 : TEH_PG_rollback (pg);
88 0 : continue;
89 102 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
90 102 : *start_row = 1U + last_end;
91 102 : break;
92 19 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
93 19 : *start_row = 0; /* base-case: no shards yet */
94 19 : break; /* continued below */
95 : }
96 : } /* get_last_shard */
97 :
98 121 : if (*start_row < shard_limit)
99 : {
100 : /* Claim fresh shard */
101 : enum GNUNET_DB_QueryStatus qs;
102 : struct GNUNET_TIME_Absolute now;
103 19 : struct GNUNET_PQ_QueryParam params[] = {
104 19 : GNUNET_PQ_query_param_string (job_name),
105 19 : GNUNET_PQ_query_param_absolute_time (&now),
106 19 : GNUNET_PQ_query_param_uint32 (start_row),
107 19 : GNUNET_PQ_query_param_uint32 (end_row),
108 : GNUNET_PQ_query_param_end
109 : };
110 :
111 19 : *end_row = GNUNET_MIN (shard_limit,
112 : *start_row + shard_size - 1);
113 19 : now = GNUNET_TIME_absolute_get ();
114 19 : GNUNET_log (GNUNET_ERROR_TYPE_INFO,
115 : "Trying to claim shard %llu-%llu\n",
116 : (unsigned long long) *start_row,
117 : (unsigned long long) *end_row);
118 :
119 : /* Used in #postgres_claim_revolving_shard() */
120 19 : PREPARE (pg,
121 : "create_revolving_shard",
122 : "INSERT INTO revolving_work_shards"
123 : "(job_name"
124 : ",last_attempt"
125 : ",start_row"
126 : ",end_row"
127 : ",active"
128 : ") VALUES "
129 : "($1, $2, $3, $4, TRUE);");
130 19 : qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
131 : "create_revolving_shard",
132 : params);
133 19 : switch (qs)
134 : {
135 0 : case GNUNET_DB_STATUS_HARD_ERROR:
136 0 : GNUNET_break (0);
137 0 : TEH_PG_rollback (pg);
138 0 : return qs;
139 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
140 0 : TEH_PG_rollback (pg);
141 0 : continue;
142 19 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
143 : /* continued below (with commit) */
144 19 : break;
145 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
146 : /* someone else got this shard already,
147 : try again */
148 0 : TEH_PG_rollback (pg);
149 0 : continue;
150 : }
151 : } /* end create fresh reovlving shard */
152 : else
153 : {
154 : /* claim oldest existing shard */
155 : enum GNUNET_DB_QueryStatus qs;
156 102 : struct GNUNET_PQ_QueryParam params[] = {
157 102 : GNUNET_PQ_query_param_string (job_name),
158 : GNUNET_PQ_query_param_end
159 : };
160 102 : struct GNUNET_PQ_ResultSpec rs[] = {
161 102 : GNUNET_PQ_result_spec_uint32 ("start_row",
162 : start_row),
163 102 : GNUNET_PQ_result_spec_uint32 ("end_row",
164 : end_row),
165 : GNUNET_PQ_result_spec_end
166 : };
167 :
168 102 : PREPARE (pg,
169 : "get_open_revolving_shard",
170 : "SELECT"
171 : " start_row"
172 : ",end_row"
173 : " FROM revolving_work_shards"
174 : " WHERE job_name=$1"
175 : " AND active=FALSE"
176 : " ORDER BY last_attempt ASC"
177 : " LIMIT 1;");
178 102 : qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
179 : "get_open_revolving_shard",
180 : params,
181 : rs);
182 102 : switch (qs)
183 : {
184 0 : case GNUNET_DB_STATUS_HARD_ERROR:
185 0 : GNUNET_break (0);
186 0 : TEH_PG_rollback (pg);
187 0 : return qs;
188 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
189 0 : TEH_PG_rollback (pg);
190 0 : continue;
191 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
192 : /* no open shards available */
193 0 : TEH_PG_rollback (pg);
194 0 : return qs;
195 102 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
196 : {
197 : enum GNUNET_DB_QueryStatus qsz;
198 : struct GNUNET_TIME_Timestamp now;
199 102 : struct GNUNET_PQ_QueryParam iparams[] = {
200 102 : GNUNET_PQ_query_param_string (job_name),
201 102 : GNUNET_PQ_query_param_timestamp (&now),
202 102 : GNUNET_PQ_query_param_uint32 (start_row),
203 102 : GNUNET_PQ_query_param_uint32 (end_row),
204 : GNUNET_PQ_query_param_end
205 : };
206 :
207 102 : now = GNUNET_TIME_timestamp_get ();
208 102 : PREPARE (pg,
209 : "reclaim_revolving_shard",
210 : "UPDATE revolving_work_shards"
211 : " SET last_attempt=$2"
212 : " ,active=TRUE"
213 : " WHERE job_name=$1"
214 : " AND start_row=$3"
215 : " AND end_row=$4");
216 102 : qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
217 : "reclaim_revolving_shard",
218 : iparams);
219 0 : switch (qsz)
220 : {
221 0 : case GNUNET_DB_STATUS_HARD_ERROR:
222 0 : GNUNET_break (0);
223 0 : TEH_PG_rollback (pg);
224 0 : return qs;
225 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
226 0 : TEH_PG_rollback (pg);
227 0 : continue;
228 102 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
229 102 : break; /* continue with commit */
230 0 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
231 0 : GNUNET_break (0); /* logic error, should be impossible */
232 0 : TEH_PG_rollback (pg);
233 0 : return GNUNET_DB_STATUS_HARD_ERROR;
234 : }
235 : }
236 102 : break; /* continue with commit */
237 : }
238 : } /* end claim oldest existing shard */
239 :
240 : /* commit */
241 : {
242 : enum GNUNET_DB_QueryStatus qs;
243 :
244 121 : qs = TEH_PG_commit (pg);
245 121 : switch (qs)
246 : {
247 0 : case GNUNET_DB_STATUS_HARD_ERROR:
248 0 : GNUNET_break (0);
249 0 : TEH_PG_rollback (pg);
250 0 : return qs;
251 0 : case GNUNET_DB_STATUS_SOFT_ERROR:
252 0 : TEH_PG_rollback (pg);
253 0 : continue;
254 121 : case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
255 : case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
256 121 : return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
257 : }
258 : }
259 : } /* retry 'for' loop */
260 0 : return GNUNET_DB_STATUS_SOFT_ERROR;
261 : }
|