LCOV - code coverage report
Current view: top level - exchangedb - pg_begin_revolving_shard.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 59 108 54.6 %
Date: 2025-06-05 21:03:14 Functions: 1 1 100.0 %

          Line data    Source code
       1             : /*
       2             :    This file is part of TALER
       3             :    Copyright (C) 2022 Taler Systems SA
       4             : 
       5             :    TALER is free software; you can redistribute it and/or modify it under the
       6             :    terms of the GNU General Public License as published by the Free Software
       7             :    Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
      12             : 
      13             :    You should have received a copy of the GNU General Public License along with
      14             :    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             :  */
      16             : /**
      17             :  * @file exchangedb/pg_begin_revolving_shard.c
      18             :  * @brief Implementation of the begin_revolving_shard function for Postgres
      19             :  * @author Christian Grothoff
      20             :  */
      21             : #include "platform.h"
      22             : #include "taler_error_codes.h"
      23             : #include "taler_dbevents.h"
      24             : #include "taler_pq_lib.h"
      25             : #include "pg_begin_revolving_shard.h"
      26             : #include "pg_commit.h"
      27             : #include "pg_helper.h"
      28             : #include "pg_start.h"
      29             : #include "pg_rollback.h"
      30             : 
      31             : enum GNUNET_DB_QueryStatus
      32         121 : TEH_PG_begin_revolving_shard (void *cls,
      33             :                               const char *job_name,
      34             :                               uint32_t shard_size,
      35             :                               uint32_t shard_limit,
      36             :                               uint32_t *start_row,
      37             :                               uint32_t *end_row)
      38             : {
      39         121 :   struct PostgresClosure *pg = cls;
      40             : 
      41         121 :   GNUNET_assert (shard_limit <= 1U + (uint32_t) INT_MAX);
      42         121 :   GNUNET_assert (shard_limit > 0);
      43         121 :   GNUNET_assert (shard_size > 0);
      44         121 :   for (unsigned int retries = 0; retries<3; retries++)
      45             :   {
      46         121 :     if (GNUNET_OK !=
      47         121 :         TEH_PG_start (pg,
      48             :                       "begin_revolving_shard"))
      49             :     {
      50           0 :       GNUNET_break (0);
      51           0 :       return GNUNET_DB_STATUS_HARD_ERROR;
      52             :     }
      53             : 
      54             :     /* First, find last 'end_row' */
      55             :     {
      56             :       enum GNUNET_DB_QueryStatus qs;
      57             :       uint32_t last_end;
      58         121 :       struct GNUNET_PQ_QueryParam params[] = {
      59         121 :         GNUNET_PQ_query_param_string (job_name),
      60             :         GNUNET_PQ_query_param_end
      61             :       };
      62         121 :       struct GNUNET_PQ_ResultSpec rs[] = {
      63         121 :         GNUNET_PQ_result_spec_uint32 ("end_row",
      64             :                                       &last_end),
      65             :         GNUNET_PQ_result_spec_end
      66             :       };
      67             :       /* Used in #postgres_begin_revolving_shard() */
      68         121 :       PREPARE (pg,
      69             :                "get_last_revolving_shard",
      70             :                "SELECT"
      71             :                " end_row"
      72             :                " FROM revolving_work_shards"
      73             :                " WHERE job_name=$1"
      74             :                " ORDER BY end_row DESC"
      75             :                " LIMIT 1;");
      76         121 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
      77             :                                                      "get_last_revolving_shard",
      78             :                                                      params,
      79             :                                                      rs);
      80         121 :       switch (qs)
      81             :       {
      82           0 :       case GNUNET_DB_STATUS_HARD_ERROR:
      83           0 :         GNUNET_break (0);
      84           0 :         TEH_PG_rollback (pg);
      85           0 :         return qs;
      86           0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
      87           0 :         TEH_PG_rollback (pg);
      88           0 :         continue;
      89         102 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
      90         102 :         *start_row = 1U + last_end;
      91         102 :         break;
      92          19 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
      93          19 :         *start_row = 0; /* base-case: no shards yet */
      94          19 :         break; /* continued below */
      95             :       }
      96             :     } /* get_last_shard */
      97             : 
      98         121 :     if (*start_row < shard_limit)
      99             :     {
     100             :       /* Claim fresh shard */
     101             :       enum GNUNET_DB_QueryStatus qs;
     102             :       struct GNUNET_TIME_Absolute now;
     103          19 :       struct GNUNET_PQ_QueryParam params[] = {
     104          19 :         GNUNET_PQ_query_param_string (job_name),
     105          19 :         GNUNET_PQ_query_param_absolute_time (&now),
     106          19 :         GNUNET_PQ_query_param_uint32 (start_row),
     107          19 :         GNUNET_PQ_query_param_uint32 (end_row),
     108             :         GNUNET_PQ_query_param_end
     109             :       };
     110             : 
     111          19 :       *end_row = GNUNET_MIN (shard_limit,
     112             :                              *start_row + shard_size - 1);
     113          19 :       now = GNUNET_TIME_absolute_get ();
     114          19 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     115             :                   "Trying to claim shard %llu-%llu\n",
     116             :                   (unsigned long long) *start_row,
     117             :                   (unsigned long long) *end_row);
     118             : 
     119             :       /* Used in #postgres_claim_revolving_shard() */
     120          19 :       PREPARE (pg,
     121             :                "create_revolving_shard",
     122             :                "INSERT INTO revolving_work_shards"
     123             :                "(job_name"
     124             :                ",last_attempt"
     125             :                ",start_row"
     126             :                ",end_row"
     127             :                ",active"
     128             :                ") VALUES "
     129             :                "($1, $2, $3, $4, TRUE);");
     130          19 :       qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     131             :                                                "create_revolving_shard",
     132             :                                                params);
     133          19 :       switch (qs)
     134             :       {
     135           0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     136           0 :         GNUNET_break (0);
     137           0 :         TEH_PG_rollback (pg);
     138           0 :         return qs;
     139           0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     140           0 :         TEH_PG_rollback (pg);
     141           0 :         continue;
     142          19 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     143             :         /* continued below (with commit) */
     144          19 :         break;
     145           0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     146             :         /* someone else got this shard already,
     147             :            try again */
     148           0 :         TEH_PG_rollback (pg);
     149           0 :         continue;
     150             :       }
     151             :     } /* end create fresh reovlving shard */
     152             :     else
     153             :     {
     154             :       /* claim oldest existing shard */
     155             :       enum GNUNET_DB_QueryStatus qs;
     156         102 :       struct GNUNET_PQ_QueryParam params[] = {
     157         102 :         GNUNET_PQ_query_param_string (job_name),
     158             :         GNUNET_PQ_query_param_end
     159             :       };
     160         102 :       struct GNUNET_PQ_ResultSpec rs[] = {
     161         102 :         GNUNET_PQ_result_spec_uint32 ("start_row",
     162             :                                       start_row),
     163         102 :         GNUNET_PQ_result_spec_uint32 ("end_row",
     164             :                                       end_row),
     165             :         GNUNET_PQ_result_spec_end
     166             :       };
     167             : 
     168         102 :       PREPARE (pg,
     169             :                "get_open_revolving_shard",
     170             :                "SELECT"
     171             :                " start_row"
     172             :                ",end_row"
     173             :                " FROM revolving_work_shards"
     174             :                " WHERE job_name=$1"
     175             :                "   AND active=FALSE"
     176             :                " ORDER BY last_attempt ASC"
     177             :                " LIMIT 1;");
     178         102 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     179             :                                                      "get_open_revolving_shard",
     180             :                                                      params,
     181             :                                                      rs);
     182         102 :       switch (qs)
     183             :       {
     184           0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     185           0 :         GNUNET_break (0);
     186           0 :         TEH_PG_rollback (pg);
     187           0 :         return qs;
     188           0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     189           0 :         TEH_PG_rollback (pg);
     190           0 :         continue;
     191           0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     192             :         /* no open shards available */
     193           0 :         TEH_PG_rollback (pg);
     194           0 :         return qs;
     195         102 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     196             :         {
     197             :           enum GNUNET_DB_QueryStatus qsz;
     198             :           struct GNUNET_TIME_Timestamp now;
     199         102 :           struct GNUNET_PQ_QueryParam iparams[] = {
     200         102 :             GNUNET_PQ_query_param_string (job_name),
     201         102 :             GNUNET_PQ_query_param_timestamp (&now),
     202         102 :             GNUNET_PQ_query_param_uint32 (start_row),
     203         102 :             GNUNET_PQ_query_param_uint32 (end_row),
     204             :             GNUNET_PQ_query_param_end
     205             :           };
     206             : 
     207         102 :           now = GNUNET_TIME_timestamp_get ();
     208         102 :           PREPARE (pg,
     209             :                    "reclaim_revolving_shard",
     210             :                    "UPDATE revolving_work_shards"
     211             :                    " SET last_attempt=$2"
     212             :                    "    ,active=TRUE"
     213             :                    " WHERE job_name=$1"
     214             :                    "   AND start_row=$3"
     215             :                    "   AND end_row=$4");
     216         102 :           qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     217             :                                                     "reclaim_revolving_shard",
     218             :                                                     iparams);
     219           0 :           switch (qsz)
     220             :           {
     221           0 :           case GNUNET_DB_STATUS_HARD_ERROR:
     222           0 :             GNUNET_break (0);
     223           0 :             TEH_PG_rollback (pg);
     224           0 :             return qs;
     225           0 :           case GNUNET_DB_STATUS_SOFT_ERROR:
     226           0 :             TEH_PG_rollback (pg);
     227           0 :             continue;
     228         102 :           case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     229         102 :             break; /* continue with commit */
     230           0 :           case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     231           0 :             GNUNET_break (0); /* logic error, should be impossible */
     232           0 :             TEH_PG_rollback (pg);
     233           0 :             return GNUNET_DB_STATUS_HARD_ERROR;
     234             :           }
     235             :         }
     236         102 :         break; /* continue with commit */
     237             :       }
     238             :     } /* end claim oldest existing shard */
     239             : 
     240             :     /* commit */
     241             :     {
     242             :       enum GNUNET_DB_QueryStatus qs;
     243             : 
     244         121 :       qs = TEH_PG_commit (pg);
     245         121 :       switch (qs)
     246             :       {
     247           0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     248           0 :         GNUNET_break (0);
     249           0 :         TEH_PG_rollback (pg);
     250           0 :         return qs;
     251           0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     252           0 :         TEH_PG_rollback (pg);
     253           0 :         continue;
     254         121 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     255             :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     256         121 :         return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     257             :       }
     258             :     }
     259             :   } /* retry 'for' loop */
     260           0 :   return GNUNET_DB_STATUS_SOFT_ERROR;
     261             : }

Generated by: LCOV version 1.16