LCOV - code coverage report
Current view: top level - exchangedb - begin_revolving_shard.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 55.1 % 107 59
Test Date: 2026-04-14 15:39:31 Functions: 100.0 % 1 1

            Line data    Source code
       1              : /*
       2              :    This file is part of TALER
       3              :    Copyright (C) 2022 Taler Systems SA
       4              : 
       5              :    TALER is free software; you can redistribute it and/or modify it under the
       6              :    terms of the GNU General Public License as published by the Free Software
       7              :    Foundation; either version 3, or (at your option) any later version.
       8              : 
       9              :    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10              :    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11              :    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
      12              : 
      13              :    You should have received a copy of the GNU General Public License along with
      14              :    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15              :  */
      16              : /**
      17              :  * @file exchangedb/begin_revolving_shard.c
      18              :  * @brief Implementation of the begin_revolving_shard function for Postgres
      19              :  * @author Christian Grothoff
      20              :  */
      21              : #include "taler/taler_pq_lib.h"
      22              : #include "exchange-database/begin_revolving_shard.h"
      23              : #include "exchange-database/commit.h"
      24              : #include "helper.h"
      25              : #include "exchange-database/start.h"
      26              : #include "exchange-database/rollback.h"
      27              : 
      28              : enum GNUNET_DB_QueryStatus
      29           97 : TALER_EXCHANGEDB_begin_revolving_shard (struct
      30              :                                         TALER_EXCHANGEDB_PostgresContext *pg,
      31              :                                         const char *job_name,
      32              :                                         uint32_t shard_size,
      33              :                                         uint32_t shard_limit,
      34              :                                         uint32_t *start_row,
      35              :                                         uint32_t *end_row)
      36              : {
      37              : 
      38           97 :   GNUNET_assert (shard_limit <= 1U + (uint32_t) INT_MAX);
      39           97 :   GNUNET_assert (shard_limit > 0);
      40           97 :   GNUNET_assert (shard_size > 0);
      41           97 :   for (unsigned int retries = 0; retries<3; retries++)
      42              :   {
      43           97 :     if (GNUNET_OK !=
      44           97 :         TALER_EXCHANGEDB_start (pg,
      45              :                                 "begin_revolving_shard"))
      46              :     {
      47            0 :       GNUNET_break (0);
      48            0 :       return GNUNET_DB_STATUS_HARD_ERROR;
      49              :     }
      50              : 
      51              :     /* First, find last 'end_row' */
      52              :     {
      53              :       enum GNUNET_DB_QueryStatus qs;
      54              :       uint32_t last_end;
      55           97 :       struct GNUNET_PQ_QueryParam params[] = {
      56           97 :         GNUNET_PQ_query_param_string (job_name),
      57              :         GNUNET_PQ_query_param_end
      58              :       };
      59           97 :       struct GNUNET_PQ_ResultSpec rs[] = {
      60           97 :         GNUNET_PQ_result_spec_uint32 ("end_row",
      61              :                                       &last_end),
      62              :         GNUNET_PQ_result_spec_end
      63              :       };
      64              :       /* Used in #postgres_begin_revolving_shard() */
      65           97 :       PREPARE (pg,
      66              :                "get_last_revolving_shard",
      67              :                "SELECT"
      68              :                " end_row"
      69              :                " FROM revolving_work_shards"
      70              :                " WHERE job_name=$1"
      71              :                " ORDER BY end_row DESC"
      72              :                " LIMIT 1;");
      73           97 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
      74              :                                                      "get_last_revolving_shard",
      75              :                                                      params,
      76              :                                                      rs);
      77           97 :       switch (qs)
      78              :       {
      79            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
      80            0 :         GNUNET_break (0);
      81            0 :         TALER_EXCHANGEDB_rollback (pg);
      82            0 :         return qs;
      83            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
      84            0 :         TALER_EXCHANGEDB_rollback (pg);
      85            0 :         continue;
      86           90 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
      87           90 :         *start_row = 1U + last_end;
      88           90 :         break;
      89            7 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
      90            7 :         *start_row = 0; /* base-case: no shards yet */
      91            7 :         break; /* continued below */
      92              :       }
      93              :     } /* get_last_shard */
      94              : 
      95           97 :     if (*start_row < shard_limit)
      96              :     {
      97              :       /* Claim fresh shard */
      98              :       enum GNUNET_DB_QueryStatus qs;
      99              :       struct GNUNET_TIME_Absolute now;
     100            7 :       struct GNUNET_PQ_QueryParam params[] = {
     101            7 :         GNUNET_PQ_query_param_string (job_name),
     102            7 :         GNUNET_PQ_query_param_absolute_time (&now),
     103            7 :         GNUNET_PQ_query_param_uint32 (start_row),
     104            7 :         GNUNET_PQ_query_param_uint32 (end_row),
     105              :         GNUNET_PQ_query_param_end
     106              :       };
     107              : 
     108            7 :       *end_row = GNUNET_MIN (shard_limit,
     109              :                              *start_row + shard_size - 1);
     110            7 :       now = GNUNET_TIME_absolute_get ();
     111            7 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     112              :                   "Trying to claim shard %llu-%llu\n",
     113              :                   (unsigned long long) *start_row,
     114              :                   (unsigned long long) *end_row);
     115              : 
     116              :       /* Used in #postgres_claim_revolving_shard() */
     117            7 :       PREPARE (pg,
     118              :                "create_revolving_shard",
     119              :                "INSERT INTO revolving_work_shards"
     120              :                "(job_name"
     121              :                ",last_attempt"
     122              :                ",start_row"
     123              :                ",end_row"
     124              :                ",active"
     125              :                ") VALUES "
     126              :                "($1, $2, $3, $4, TRUE);");
     127            7 :       qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     128              :                                                "create_revolving_shard",
     129              :                                                params);
     130            7 :       switch (qs)
     131              :       {
     132            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     133            0 :         GNUNET_break (0);
     134            0 :         TALER_EXCHANGEDB_rollback (pg);
     135            0 :         return qs;
     136            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     137            0 :         TALER_EXCHANGEDB_rollback (pg);
     138            0 :         continue;
     139            7 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     140              :         /* continued below (with commit) */
     141            7 :         break;
     142            0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     143              :         /* someone else got this shard already,
     144              :            try again */
     145            0 :         TALER_EXCHANGEDB_rollback (pg);
     146            0 :         continue;
     147              :       }
     148              :     } /* end create fresh reovlving shard */
     149              :     else
     150              :     {
     151              :       /* claim oldest existing shard */
     152              :       enum GNUNET_DB_QueryStatus qs;
     153           90 :       struct GNUNET_PQ_QueryParam params[] = {
     154           90 :         GNUNET_PQ_query_param_string (job_name),
     155              :         GNUNET_PQ_query_param_end
     156              :       };
     157           90 :       struct GNUNET_PQ_ResultSpec rs[] = {
     158           90 :         GNUNET_PQ_result_spec_uint32 ("start_row",
     159              :                                       start_row),
     160           90 :         GNUNET_PQ_result_spec_uint32 ("end_row",
     161              :                                       end_row),
     162              :         GNUNET_PQ_result_spec_end
     163              :       };
     164              : 
     165           90 :       PREPARE (pg,
     166              :                "get_open_revolving_shard",
     167              :                "SELECT"
     168              :                " start_row"
     169              :                ",end_row"
     170              :                " FROM revolving_work_shards"
     171              :                " WHERE job_name=$1"
     172              :                "   AND active=FALSE"
     173              :                " ORDER BY last_attempt ASC"
     174              :                " LIMIT 1;");
     175           90 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     176              :                                                      "get_open_revolving_shard",
     177              :                                                      params,
     178              :                                                      rs);
     179           90 :       switch (qs)
     180              :       {
     181            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     182            0 :         GNUNET_break (0);
     183            0 :         TALER_EXCHANGEDB_rollback (pg);
     184            0 :         return qs;
     185            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     186            0 :         TALER_EXCHANGEDB_rollback (pg);
     187            0 :         continue;
     188            0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     189              :         /* no open shards available */
     190            0 :         TALER_EXCHANGEDB_rollback (pg);
     191            0 :         return qs;
     192           90 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     193              :         {
     194              :           enum GNUNET_DB_QueryStatus qsz;
     195              :           struct GNUNET_TIME_Timestamp now;
     196           90 :           struct GNUNET_PQ_QueryParam iparams[] = {
     197           90 :             GNUNET_PQ_query_param_string (job_name),
     198           90 :             GNUNET_PQ_query_param_timestamp (&now),
     199           90 :             GNUNET_PQ_query_param_uint32 (start_row),
     200           90 :             GNUNET_PQ_query_param_uint32 (end_row),
     201              :             GNUNET_PQ_query_param_end
     202              :           };
     203              : 
     204           90 :           now = GNUNET_TIME_timestamp_get ();
     205           90 :           PREPARE (pg,
     206              :                    "reclaim_revolving_shard",
     207              :                    "UPDATE revolving_work_shards"
     208              :                    " SET last_attempt=$2"
     209              :                    "    ,active=TRUE"
     210              :                    " WHERE job_name=$1"
     211              :                    "   AND start_row=$3"
     212              :                    "   AND end_row=$4");
     213           90 :           qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     214              :                                                     "reclaim_revolving_shard",
     215              :                                                     iparams);
     216           90 :           switch (qsz)
     217              :           {
     218            0 :           case GNUNET_DB_STATUS_HARD_ERROR:
     219            0 :             GNUNET_break (0);
     220            0 :             TALER_EXCHANGEDB_rollback (pg);
     221            0 :             return qs;
     222            0 :           case GNUNET_DB_STATUS_SOFT_ERROR:
     223            0 :             TALER_EXCHANGEDB_rollback (pg);
     224            0 :             continue;
     225           90 :           case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     226           90 :             break; /* continue with commit */
     227            0 :           case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     228            0 :             GNUNET_break (0); /* logic error, should be impossible */
     229            0 :             TALER_EXCHANGEDB_rollback (pg);
     230            0 :             return GNUNET_DB_STATUS_HARD_ERROR;
     231              :           }
     232              :         }
     233           90 :         break; /* continue with commit */
     234              :       }
     235              :     } /* end claim oldest existing shard */
     236              : 
     237              :     /* commit */
     238              :     {
     239              :       enum GNUNET_DB_QueryStatus qs;
     240              : 
     241           97 :       qs = TALER_EXCHANGEDB_commit (pg);
     242           97 :       switch (qs)
     243              :       {
     244            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     245            0 :         GNUNET_break (0);
     246            0 :         TALER_EXCHANGEDB_rollback (pg);
     247            0 :         return qs;
     248            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     249            0 :         TALER_EXCHANGEDB_rollback (pg);
     250            0 :         continue;
     251           97 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     252              :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     253           97 :         return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     254              :       }
     255              :     }
     256              :   } /* retry 'for' loop */
     257            0 :   return GNUNET_DB_STATUS_SOFT_ERROR;
     258              : }
        

Generated by: LCOV version 2.0-1