LCOV - code coverage report
Current view: top level - exchangedb - pg_begin_revolving_shard.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 55.6 % 108 60
Test Date: 2025-12-28 14:06:02 Functions: 100.0 % 1 1

            Line data    Source code
       1              : /*
       2              :    This file is part of TALER
       3              :    Copyright (C) 2022 Taler Systems SA
       4              : 
       5              :    TALER is free software; you can redistribute it and/or modify it under the
       6              :    terms of the GNU General Public License as published by the Free Software
       7              :    Foundation; either version 3, or (at your option) any later version.
       8              : 
       9              :    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10              :    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11              :    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
      12              : 
      13              :    You should have received a copy of the GNU General Public License along with
      14              :    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15              :  */
      16              : /**
      17              :  * @file exchangedb/pg_begin_revolving_shard.c
      18              :  * @brief Implementation of the begin_revolving_shard function for Postgres
      19              :  * @author Christian Grothoff
      20              :  */
      21              : #include "taler/platform.h"
      22              : #include "taler/taler_error_codes.h"
      23              : #include "taler/taler_dbevents.h"
      24              : #include "taler/taler_pq_lib.h"
      25              : #include "pg_begin_revolving_shard.h"
      26              : #include "pg_commit.h"
      27              : #include "pg_helper.h"
      28              : #include "pg_start.h"
      29              : #include "pg_rollback.h"
      30              : 
      31              : enum GNUNET_DB_QueryStatus
      32           97 : TEH_PG_begin_revolving_shard (void *cls,
      33              :                               const char *job_name,
      34              :                               uint32_t shard_size,
      35              :                               uint32_t shard_limit,
      36              :                               uint32_t *start_row,
      37              :                               uint32_t *end_row)
      38              : {
      39           97 :   struct PostgresClosure *pg = cls;
      40              : 
      41           97 :   GNUNET_assert (shard_limit <= 1U + (uint32_t) INT_MAX);
      42           97 :   GNUNET_assert (shard_limit > 0);
      43           97 :   GNUNET_assert (shard_size > 0);
      44           97 :   for (unsigned int retries = 0; retries<3; retries++)
      45              :   {
      46           97 :     if (GNUNET_OK !=
      47           97 :         TEH_PG_start (pg,
      48              :                       "begin_revolving_shard"))
      49              :     {
      50            0 :       GNUNET_break (0);
      51            0 :       return GNUNET_DB_STATUS_HARD_ERROR;
      52              :     }
      53              : 
      54              :     /* First, find last 'end_row' */
      55              :     {
      56              :       enum GNUNET_DB_QueryStatus qs;
      57              :       uint32_t last_end;
      58           97 :       struct GNUNET_PQ_QueryParam params[] = {
      59           97 :         GNUNET_PQ_query_param_string (job_name),
      60              :         GNUNET_PQ_query_param_end
      61              :       };
      62           97 :       struct GNUNET_PQ_ResultSpec rs[] = {
      63           97 :         GNUNET_PQ_result_spec_uint32 ("end_row",
      64              :                                       &last_end),
      65              :         GNUNET_PQ_result_spec_end
      66              :       };
      67              :       /* Used in #postgres_begin_revolving_shard() */
      68           97 :       PREPARE (pg,
      69              :                "get_last_revolving_shard",
      70              :                "SELECT"
      71              :                " end_row"
      72              :                " FROM revolving_work_shards"
      73              :                " WHERE job_name=$1"
      74              :                " ORDER BY end_row DESC"
      75              :                " LIMIT 1;");
      76           97 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
      77              :                                                      "get_last_revolving_shard",
      78              :                                                      params,
      79              :                                                      rs);
      80           97 :       switch (qs)
      81              :       {
      82            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
      83            0 :         GNUNET_break (0);
      84            0 :         TEH_PG_rollback (pg);
      85            0 :         return qs;
      86            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
      87            0 :         TEH_PG_rollback (pg);
      88            0 :         continue;
      89           90 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
      90           90 :         *start_row = 1U + last_end;
      91           90 :         break;
      92            7 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
      93            7 :         *start_row = 0; /* base-case: no shards yet */
      94            7 :         break; /* continued below */
      95              :       }
      96              :     } /* get_last_shard */
      97              : 
      98           97 :     if (*start_row < shard_limit)
      99              :     {
     100              :       /* Claim fresh shard */
     101              :       enum GNUNET_DB_QueryStatus qs;
     102              :       struct GNUNET_TIME_Absolute now;
     103            7 :       struct GNUNET_PQ_QueryParam params[] = {
     104            7 :         GNUNET_PQ_query_param_string (job_name),
     105            7 :         GNUNET_PQ_query_param_absolute_time (&now),
     106            7 :         GNUNET_PQ_query_param_uint32 (start_row),
     107            7 :         GNUNET_PQ_query_param_uint32 (end_row),
     108              :         GNUNET_PQ_query_param_end
     109              :       };
     110              : 
     111            7 :       *end_row = GNUNET_MIN (shard_limit,
     112              :                              *start_row + shard_size - 1);
     113            7 :       now = GNUNET_TIME_absolute_get ();
     114            7 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     115              :                   "Trying to claim shard %llu-%llu\n",
     116              :                   (unsigned long long) *start_row,
     117              :                   (unsigned long long) *end_row);
     118              : 
     119              :       /* Used in #postgres_claim_revolving_shard() */
     120            7 :       PREPARE (pg,
     121              :                "create_revolving_shard",
     122              :                "INSERT INTO revolving_work_shards"
     123              :                "(job_name"
     124              :                ",last_attempt"
     125              :                ",start_row"
     126              :                ",end_row"
     127              :                ",active"
     128              :                ") VALUES "
     129              :                "($1, $2, $3, $4, TRUE);");
     130            7 :       qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     131              :                                                "create_revolving_shard",
     132              :                                                params);
     133            7 :       switch (qs)
     134              :       {
     135            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     136            0 :         GNUNET_break (0);
     137            0 :         TEH_PG_rollback (pg);
     138            0 :         return qs;
     139            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     140            0 :         TEH_PG_rollback (pg);
     141            0 :         continue;
     142            7 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     143              :         /* continued below (with commit) */
     144            7 :         break;
     145            0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     146              :         /* someone else got this shard already,
     147              :            try again */
     148            0 :         TEH_PG_rollback (pg);
     149            0 :         continue;
     150              :       }
     151              :     } /* end create fresh reovlving shard */
     152              :     else
     153              :     {
     154              :       /* claim oldest existing shard */
     155              :       enum GNUNET_DB_QueryStatus qs;
     156           90 :       struct GNUNET_PQ_QueryParam params[] = {
     157           90 :         GNUNET_PQ_query_param_string (job_name),
     158              :         GNUNET_PQ_query_param_end
     159              :       };
     160           90 :       struct GNUNET_PQ_ResultSpec rs[] = {
     161           90 :         GNUNET_PQ_result_spec_uint32 ("start_row",
     162              :                                       start_row),
     163           90 :         GNUNET_PQ_result_spec_uint32 ("end_row",
     164              :                                       end_row),
     165              :         GNUNET_PQ_result_spec_end
     166              :       };
     167              : 
     168           90 :       PREPARE (pg,
     169              :                "get_open_revolving_shard",
     170              :                "SELECT"
     171              :                " start_row"
     172              :                ",end_row"
     173              :                " FROM revolving_work_shards"
     174              :                " WHERE job_name=$1"
     175              :                "   AND active=FALSE"
     176              :                " ORDER BY last_attempt ASC"
     177              :                " LIMIT 1;");
     178           90 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     179              :                                                      "get_open_revolving_shard",
     180              :                                                      params,
     181              :                                                      rs);
     182           90 :       switch (qs)
     183              :       {
     184            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     185            0 :         GNUNET_break (0);
     186            0 :         TEH_PG_rollback (pg);
     187            0 :         return qs;
     188            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     189            0 :         TEH_PG_rollback (pg);
     190            0 :         continue;
     191            0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     192              :         /* no open shards available */
     193            0 :         TEH_PG_rollback (pg);
     194            0 :         return qs;
     195           90 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     196              :         {
     197              :           enum GNUNET_DB_QueryStatus qsz;
     198              :           struct GNUNET_TIME_Timestamp now;
     199           90 :           struct GNUNET_PQ_QueryParam iparams[] = {
     200           90 :             GNUNET_PQ_query_param_string (job_name),
     201           90 :             GNUNET_PQ_query_param_timestamp (&now),
     202           90 :             GNUNET_PQ_query_param_uint32 (start_row),
     203           90 :             GNUNET_PQ_query_param_uint32 (end_row),
     204              :             GNUNET_PQ_query_param_end
     205              :           };
     206              : 
     207           90 :           now = GNUNET_TIME_timestamp_get ();
     208           90 :           PREPARE (pg,
     209              :                    "reclaim_revolving_shard",
     210              :                    "UPDATE revolving_work_shards"
     211              :                    " SET last_attempt=$2"
     212              :                    "    ,active=TRUE"
     213              :                    " WHERE job_name=$1"
     214              :                    "   AND start_row=$3"
     215              :                    "   AND end_row=$4");
     216           90 :           qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     217              :                                                     "reclaim_revolving_shard",
     218              :                                                     iparams);
     219           90 :           switch (qsz)
     220              :           {
     221            0 :           case GNUNET_DB_STATUS_HARD_ERROR:
     222            0 :             GNUNET_break (0);
     223            0 :             TEH_PG_rollback (pg);
     224            0 :             return qs;
     225            0 :           case GNUNET_DB_STATUS_SOFT_ERROR:
     226            0 :             TEH_PG_rollback (pg);
     227            0 :             continue;
     228           90 :           case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     229           90 :             break; /* continue with commit */
     230            0 :           case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     231            0 :             GNUNET_break (0); /* logic error, should be impossible */
     232            0 :             TEH_PG_rollback (pg);
     233            0 :             return GNUNET_DB_STATUS_HARD_ERROR;
     234              :           }
     235              :         }
     236           90 :         break; /* continue with commit */
     237              :       }
     238              :     } /* end claim oldest existing shard */
     239              : 
     240              :     /* commit */
     241              :     {
     242              :       enum GNUNET_DB_QueryStatus qs;
     243              : 
     244           97 :       qs = TEH_PG_commit (pg);
     245           97 :       switch (qs)
     246              :       {
     247            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     248            0 :         GNUNET_break (0);
     249            0 :         TEH_PG_rollback (pg);
     250            0 :         return qs;
     251            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     252            0 :         TEH_PG_rollback (pg);
     253            0 :         continue;
     254           97 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     255              :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     256           97 :         return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     257              :       }
     258              :     }
     259              :   } /* retry 'for' loop */
     260            0 :   return GNUNET_DB_STATUS_SOFT_ERROR;
     261              : }
        

Generated by: LCOV version 2.0-1