LCOV - code coverage report
Current view: top level - exchangedb - pg_begin_shard.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 54.1 % 111 60
Test Date: 2025-12-28 14:06:02 Functions: 100.0 % 1 1

            Line data    Source code
       1              : /*
       2              :    This file is part of TALER
       3              :    Copyright (C) 2022 Taler Systems SA
       4              : 
       5              :    TALER is free software; you can redistribute it and/or modify it under the
       6              :    terms of the GNU General Public License as published by the Free Software
       7              :    Foundation; either version 3, or (at your option) any later version.
       8              : 
       9              :    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10              :    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11              :    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
      12              : 
      13              :    You should have received a copy of the GNU General Public License along with
      14              :    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15              :  */
      16              : /**
      17              :  * @file exchangedb/pg_begin_shard.c
      18              :  * @brief Implementation of the begin_shard function for Postgres
      19              :  * @author Christian Grothoff
      20              :  */
      21              : #include "taler/platform.h"
      22              : #include "taler/taler_error_codes.h"
      23              : #include "taler/taler_dbevents.h"
      24              : #include "taler/taler_pq_lib.h"
      25              : #include "pg_begin_shard.h"
      26              : #include "pg_helper.h"
      27              : #include "pg_start.h"
      28              : #include "pg_rollback.h"
      29              : #include "pg_commit.h"
      30              : 
      31              : 
      32              : enum GNUNET_DB_QueryStatus
      33          281 : TEH_PG_begin_shard (void *cls,
      34              :                     const char *job_name,
      35              :                     struct GNUNET_TIME_Relative delay,
      36              :                     uint64_t shard_size,
      37              :                     uint64_t *start_row,
      38              :                     uint64_t *end_row)
      39              : {
      40          281 :   struct PostgresClosure *pg = cls;
      41              : 
      42          281 :   for (unsigned int retries = 0; retries<10; retries++)
      43              :   {
      44          281 :     if (GNUNET_OK !=
      45          281 :         TEH_PG_start (pg,
      46              :                       "begin_shard"))
      47              :     {
      48            0 :       GNUNET_break (0);
      49            0 :       return GNUNET_DB_STATUS_HARD_ERROR;
      50              :     }
      51              : 
      52              :     {
      53              :       struct GNUNET_TIME_Absolute past;
      54              :       enum GNUNET_DB_QueryStatus qs;
      55          281 :       struct GNUNET_PQ_QueryParam params[] = {
      56          281 :         GNUNET_PQ_query_param_string (job_name),
      57          281 :         GNUNET_PQ_query_param_absolute_time (&past),
      58              :         GNUNET_PQ_query_param_end
      59              :       };
      60          281 :       struct GNUNET_PQ_ResultSpec rs[] = {
      61          281 :         GNUNET_PQ_result_spec_uint64 ("start_row",
      62              :                                       start_row),
      63          281 :         GNUNET_PQ_result_spec_uint64 ("end_row",
      64              :                                       end_row),
      65              :         GNUNET_PQ_result_spec_end
      66              :       };
      67              : 
      68          281 :       past = GNUNET_TIME_absolute_get ();
      69          281 :       PREPARE (pg,
      70              :                "get_open_shard",
      71              :                "SELECT"
      72              :                " start_row"
      73              :                ",end_row"
      74              :                " FROM work_shards"
      75              :                " WHERE job_name=$1"
      76              :                "   AND completed=FALSE"
      77              :                "   AND last_attempt<$2"
      78              :                " ORDER BY last_attempt ASC"
      79              :                " LIMIT 1;");
      80          281 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
      81              :                                                      "get_open_shard",
      82              :                                                      params,
      83              :                                                      rs);
      84          281 :       switch (qs)
      85              :       {
      86            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
      87            0 :         GNUNET_break (0);
      88            0 :         TEH_PG_rollback (pg);
      89            0 :         return qs;
      90            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
      91            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
      92              :                     "Serialization error on getting open shard\n");
      93            0 :         TEH_PG_rollback (pg);
      94            0 :         continue;
      95           90 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
      96              :         {
      97              :           enum GNUNET_DB_QueryStatus qsz;
      98              :           struct GNUNET_TIME_Absolute now;
      99           90 :           struct GNUNET_PQ_QueryParam iparams[] = {
     100           90 :             GNUNET_PQ_query_param_string (job_name),
     101           90 :             GNUNET_PQ_query_param_absolute_time (&now),
     102           90 :             GNUNET_PQ_query_param_uint64 (start_row),
     103           90 :             GNUNET_PQ_query_param_uint64 (end_row),
     104              :             GNUNET_PQ_query_param_end
     105              :           };
     106              : 
     107           90 :           now = GNUNET_TIME_relative_to_absolute (delay);
     108           90 :           PREPARE (pg,
     109              :                    "reclaim_shard",
     110              :                    "UPDATE work_shards"
     111              :                    " SET last_attempt=$2"
     112              :                    " WHERE job_name=$1"
     113              :                    "   AND start_row=$3"
     114              :                    "   AND end_row=$4");
     115           90 :           qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     116              :                                                     "reclaim_shard",
     117              :                                                     iparams);
     118           90 :           switch (qsz)
     119              :           {
     120            0 :           case GNUNET_DB_STATUS_HARD_ERROR:
     121            0 :             GNUNET_break (0);
     122            0 :             TEH_PG_rollback (pg);
     123            0 :             return qsz;
     124            0 :           case GNUNET_DB_STATUS_SOFT_ERROR:
     125            0 :             GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     126              :                         "Serialization error on claiming open shard\n");
     127            0 :             TEH_PG_rollback (pg);
     128            0 :             continue;
     129           90 :           case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     130           90 :             goto commit;
     131            0 :           case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     132            0 :             GNUNET_break (0); /* logic error, should be impossible */
     133            0 :             TEH_PG_rollback (pg);
     134            0 :             return GNUNET_DB_STATUS_HARD_ERROR;
     135              :           }
     136              :         }
     137            0 :         break; /* actually unreachable */
     138          191 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     139          191 :         break; /* continued below */
     140              :       }
     141              :     } /* get_open_shard */
     142              : 
     143              :     /* No open shard, find last 'end_row' */
     144              :     {
     145              :       enum GNUNET_DB_QueryStatus qs;
     146          191 :       struct GNUNET_PQ_QueryParam params[] = {
     147          191 :         GNUNET_PQ_query_param_string (job_name),
     148              :         GNUNET_PQ_query_param_end
     149              :       };
     150          191 :       struct GNUNET_PQ_ResultSpec rs[] = {
     151          191 :         GNUNET_PQ_result_spec_uint64 ("end_row",
     152              :                                       start_row),
     153              :         GNUNET_PQ_result_spec_end
     154              :       };
     155              : 
     156          191 :       PREPARE (pg,
     157              :                "get_last_shard",
     158              :                "SELECT"
     159              :                " end_row"
     160              :                " FROM work_shards"
     161              :                " WHERE job_name=$1"
     162              :                " ORDER BY end_row DESC"
     163              :                " LIMIT 1;");
     164          191 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     165              :                                                      "get_last_shard",
     166              :                                                      params,
     167              :                                                      rs);
     168          191 :       switch (qs)
     169              :       {
     170            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     171            0 :         GNUNET_break (0);
     172            0 :         TEH_PG_rollback (pg);
     173            0 :         return qs;
     174            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     175            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     176              :                     "Serialization error on getting last shard\n");
     177            0 :         TEH_PG_rollback (pg);
     178            0 :         continue;
     179          170 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     180          170 :         break;
     181           21 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     182           21 :         *start_row = 0; /* base-case: no shards yet */
     183           21 :         break; /* continued below */
     184              :       }
     185          191 :       *end_row = *start_row + shard_size;
     186              :     } /* get_last_shard */
     187              : 
     188              :     /* Claim fresh shard */
     189              :     {
     190              :       enum GNUNET_DB_QueryStatus qs;
     191              :       struct GNUNET_TIME_Absolute now;
     192          191 :       struct GNUNET_PQ_QueryParam params[] = {
     193          191 :         GNUNET_PQ_query_param_string (job_name),
     194          191 :         GNUNET_PQ_query_param_absolute_time (&now),
     195          191 :         GNUNET_PQ_query_param_uint64 (start_row),
     196          191 :         GNUNET_PQ_query_param_uint64 (end_row),
     197              :         GNUNET_PQ_query_param_end
     198              :       };
     199              : 
     200          191 :       now = GNUNET_TIME_relative_to_absolute (delay);
     201          191 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     202              :                   "Trying to claim shard (%llu-%llu]\n",
     203              :                   (unsigned long long) *start_row,
     204              :                   (unsigned long long) *end_row);
     205              : 
     206          191 :       PREPARE (pg,
     207              :                "claim_next_shard",
     208              :                "INSERT INTO work_shards"
     209              :                "(job_name"
     210              :                ",last_attempt"
     211              :                ",start_row"
     212              :                ",end_row"
     213              :                ") VALUES "
     214              :                "($1, $2, $3, $4);");
     215          191 :       qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     216              :                                                "claim_next_shard",
     217              :                                                params);
     218          191 :       switch (qs)
     219              :       {
     220            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     221            0 :         GNUNET_break (0);
     222            0 :         TEH_PG_rollback (pg);
     223            0 :         return qs;
     224            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     225            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     226              :                     "Serialization error on claiming next shard\n");
     227            0 :         TEH_PG_rollback (pg);
     228            0 :         continue;
     229          191 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     230              :         /* continued below */
     231          191 :         break;
     232            0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     233              :         /* someone else got this shard already,
     234              :            try again */
     235            0 :         TEH_PG_rollback (pg);
     236            0 :         continue;
     237              :       }
     238              :     } /* claim_next_shard */
     239              : 
     240              :     /* commit */
     241          281 : commit:
     242              :     {
     243              :       enum GNUNET_DB_QueryStatus qs;
     244              : 
     245          281 :       qs = TEH_PG_commit (pg);
     246          281 :       switch (qs)
     247              :       {
     248            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     249            0 :         GNUNET_break (0);
     250            0 :         TEH_PG_rollback (pg);
     251            0 :         return qs;
     252            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     253            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     254              :                     "Serialization error on commit for beginning shard\n");
     255            0 :         TEH_PG_rollback (pg);
     256            0 :         continue;
     257          281 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     258              :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     259          281 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     260              :                     "Claimed new shard\n");
     261          281 :         return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     262              :       }
     263              :     }
     264              :   } /* retry 'for' loop */
     265            0 :   return GNUNET_DB_STATUS_SOFT_ERROR;
     266              : }
        

Generated by: LCOV version 2.0-1