LCOV - code coverage report
Current view: top level - exchangedb - begin_shard.c (source / functions) Coverage Total Hit
Test: coverage.info Lines: 53.6 % 110 59
Test Date: 2026-04-14 15:39:31 Functions: 100.0 % 1 1

            Line data    Source code
       1              : /*
       2              :    This file is part of TALER
       3              :    Copyright (C) 2022 Taler Systems SA
       4              : 
       5              :    TALER is free software; you can redistribute it and/or modify it under the
       6              :    terms of the GNU General Public License as published by the Free Software
       7              :    Foundation; either version 3, or (at your option) any later version.
       8              : 
       9              :    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10              :    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11              :    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
      12              : 
      13              :    You should have received a copy of the GNU General Public License along with
      14              :    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15              :  */
      16              : /**
      17              :  * @file exchangedb/begin_shard.c
      18              :  * @brief Implementation of the begin_shard function for Postgres
      19              :  * @author Christian Grothoff
      20              :  */
      21              : #include "taler/taler_pq_lib.h"
      22              : #include "exchange-database/begin_shard.h"
      23              : #include "helper.h"
      24              : #include "exchange-database/start.h"
      25              : #include "exchange-database/rollback.h"
      26              : #include "exchange-database/commit.h"
      27              : 
      28              : 
      29              : enum GNUNET_DB_QueryStatus
      30          281 : TALER_EXCHANGEDB_begin_shard (struct TALER_EXCHANGEDB_PostgresContext *pg,
      31              :                               const char *job_name,
      32              :                               struct GNUNET_TIME_Relative delay,
      33              :                               uint64_t shard_size,
      34              :                               uint64_t *start_row,
      35              :                               uint64_t *end_row)
      36              : {
      37              : 
      38          281 :   for (unsigned int retries = 0; retries<10; retries++)
      39              :   {
      40          281 :     if (GNUNET_OK !=
      41          281 :         TALER_EXCHANGEDB_start (pg,
      42              :                                 "begin_shard"))
      43              :     {
      44            0 :       GNUNET_break (0);
      45            0 :       return GNUNET_DB_STATUS_HARD_ERROR;
      46              :     }
      47              : 
      48              :     {
      49              :       struct GNUNET_TIME_Absolute past;
      50              :       enum GNUNET_DB_QueryStatus qs;
      51          281 :       struct GNUNET_PQ_QueryParam params[] = {
      52          281 :         GNUNET_PQ_query_param_string (job_name),
      53          281 :         GNUNET_PQ_query_param_absolute_time (&past),
      54              :         GNUNET_PQ_query_param_end
      55              :       };
      56          281 :       struct GNUNET_PQ_ResultSpec rs[] = {
      57          281 :         GNUNET_PQ_result_spec_uint64 ("start_row",
      58              :                                       start_row),
      59          281 :         GNUNET_PQ_result_spec_uint64 ("end_row",
      60              :                                       end_row),
      61              :         GNUNET_PQ_result_spec_end
      62              :       };
      63              : 
      64          281 :       past = GNUNET_TIME_absolute_get ();
      65          281 :       PREPARE (pg,
      66              :                "get_open_shard",
      67              :                "SELECT"
      68              :                " start_row"
      69              :                ",end_row"
      70              :                " FROM work_shards"
      71              :                " WHERE job_name=$1"
      72              :                "   AND completed=FALSE"
      73              :                "   AND last_attempt<$2"
      74              :                " ORDER BY last_attempt ASC"
      75              :                " LIMIT 1;");
      76          281 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
      77              :                                                      "get_open_shard",
      78              :                                                      params,
      79              :                                                      rs);
      80          281 :       switch (qs)
      81              :       {
      82            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
      83            0 :         GNUNET_break (0);
      84            0 :         TALER_EXCHANGEDB_rollback (pg);
      85            0 :         return qs;
      86            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
      87            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
      88              :                     "Serialization error on getting open shard\n");
      89            0 :         TALER_EXCHANGEDB_rollback (pg);
      90            0 :         continue;
      91           90 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
      92              :         {
      93              :           enum GNUNET_DB_QueryStatus qsz;
      94              :           struct GNUNET_TIME_Absolute now;
      95           90 :           struct GNUNET_PQ_QueryParam iparams[] = {
      96           90 :             GNUNET_PQ_query_param_string (job_name),
      97           90 :             GNUNET_PQ_query_param_absolute_time (&now),
      98           90 :             GNUNET_PQ_query_param_uint64 (start_row),
      99           90 :             GNUNET_PQ_query_param_uint64 (end_row),
     100              :             GNUNET_PQ_query_param_end
     101              :           };
     102              : 
     103           90 :           now = GNUNET_TIME_relative_to_absolute (delay);
     104           90 :           PREPARE (pg,
     105              :                    "reclaim_shard",
     106              :                    "UPDATE work_shards"
     107              :                    " SET last_attempt=$2"
     108              :                    " WHERE job_name=$1"
     109              :                    "   AND start_row=$3"
     110              :                    "   AND end_row=$4");
     111           90 :           qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     112              :                                                     "reclaim_shard",
     113              :                                                     iparams);
     114           90 :           switch (qsz)
     115              :           {
     116            0 :           case GNUNET_DB_STATUS_HARD_ERROR:
     117            0 :             GNUNET_break (0);
     118            0 :             TALER_EXCHANGEDB_rollback (pg);
     119            0 :             return qsz;
     120            0 :           case GNUNET_DB_STATUS_SOFT_ERROR:
     121            0 :             GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     122              :                         "Serialization error on claiming open shard\n");
     123            0 :             TALER_EXCHANGEDB_rollback (pg);
     124            0 :             continue;
     125           90 :           case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     126           90 :             goto commit;
     127            0 :           case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     128            0 :             GNUNET_break (0); /* logic error, should be impossible */
     129            0 :             TALER_EXCHANGEDB_rollback (pg);
     130            0 :             return GNUNET_DB_STATUS_HARD_ERROR;
     131              :           }
     132              :         }
     133            0 :         break; /* actually unreachable */
     134          191 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     135          191 :         break; /* continued below */
     136              :       }
     137              :     } /* get_open_shard */
     138              : 
     139              :     /* No open shard, find last 'end_row' */
     140              :     {
     141              :       enum GNUNET_DB_QueryStatus qs;
     142          191 :       struct GNUNET_PQ_QueryParam params[] = {
     143          191 :         GNUNET_PQ_query_param_string (job_name),
     144              :         GNUNET_PQ_query_param_end
     145              :       };
     146          191 :       struct GNUNET_PQ_ResultSpec rs[] = {
     147          191 :         GNUNET_PQ_result_spec_uint64 ("end_row",
     148              :                                       start_row),
     149              :         GNUNET_PQ_result_spec_end
     150              :       };
     151              : 
     152          191 :       PREPARE (pg,
     153              :                "get_last_shard",
     154              :                "SELECT"
     155              :                " end_row"
     156              :                " FROM work_shards"
     157              :                " WHERE job_name=$1"
     158              :                " ORDER BY end_row DESC"
     159              :                " LIMIT 1;");
     160          191 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     161              :                                                      "get_last_shard",
     162              :                                                      params,
     163              :                                                      rs);
     164          191 :       switch (qs)
     165              :       {
     166            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     167            0 :         GNUNET_break (0);
     168            0 :         TALER_EXCHANGEDB_rollback (pg);
     169            0 :         return qs;
     170            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     171            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     172              :                     "Serialization error on getting last shard\n");
     173            0 :         TALER_EXCHANGEDB_rollback (pg);
     174            0 :         continue;
     175          170 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     176          170 :         break;
     177           21 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     178           21 :         *start_row = 0; /* base-case: no shards yet */
     179           21 :         break; /* continued below */
     180              :       }
     181          191 :       *end_row = *start_row + shard_size;
     182              :     } /* get_last_shard */
     183              : 
     184              :     /* Claim fresh shard */
     185              :     {
     186              :       enum GNUNET_DB_QueryStatus qs;
     187              :       struct GNUNET_TIME_Absolute now;
     188          191 :       struct GNUNET_PQ_QueryParam params[] = {
     189          191 :         GNUNET_PQ_query_param_string (job_name),
     190          191 :         GNUNET_PQ_query_param_absolute_time (&now),
     191          191 :         GNUNET_PQ_query_param_uint64 (start_row),
     192          191 :         GNUNET_PQ_query_param_uint64 (end_row),
     193              :         GNUNET_PQ_query_param_end
     194              :       };
     195              : 
     196          191 :       now = GNUNET_TIME_relative_to_absolute (delay);
     197          191 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     198              :                   "Trying to claim shard (%llu-%llu]\n",
     199              :                   (unsigned long long) *start_row,
     200              :                   (unsigned long long) *end_row);
     201              : 
     202          191 :       PREPARE (pg,
     203              :                "claim_next_shard",
     204              :                "INSERT INTO work_shards"
     205              :                "(job_name"
     206              :                ",last_attempt"
     207              :                ",start_row"
     208              :                ",end_row"
     209              :                ") VALUES "
     210              :                "($1, $2, $3, $4);");
     211          191 :       qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     212              :                                                "claim_next_shard",
     213              :                                                params);
     214          191 :       switch (qs)
     215              :       {
     216            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     217            0 :         GNUNET_break (0);
     218            0 :         TALER_EXCHANGEDB_rollback (pg);
     219            0 :         return qs;
     220            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     221            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     222              :                     "Serialization error on claiming next shard\n");
     223            0 :         TALER_EXCHANGEDB_rollback (pg);
     224            0 :         continue;
     225          191 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     226              :         /* continued below */
     227          191 :         break;
     228            0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     229              :         /* someone else got this shard already,
     230              :            try again */
     231            0 :         TALER_EXCHANGEDB_rollback (pg);
     232            0 :         continue;
     233              :       }
     234              :     } /* claim_next_shard */
     235              : 
     236              :     /* commit */
     237          281 : commit:
     238              :     {
     239              :       enum GNUNET_DB_QueryStatus qs;
     240              : 
     241          281 :       qs = TALER_EXCHANGEDB_commit (pg);
     242          281 :       switch (qs)
     243              :       {
     244            0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     245            0 :         GNUNET_break (0);
     246            0 :         TALER_EXCHANGEDB_rollback (pg);
     247            0 :         return qs;
     248            0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     249            0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     250              :                     "Serialization error on commit for beginning shard\n");
     251            0 :         TALER_EXCHANGEDB_rollback (pg);
     252            0 :         continue;
     253          281 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     254              :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     255          281 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     256              :                     "Claimed new shard\n");
     257          281 :         return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     258              :       }
     259              :     }
     260              :   } /* retry 'for' loop */
     261            0 :   return GNUNET_DB_STATUS_SOFT_ERROR;
     262              : }
        

Generated by: LCOV version 2.0-1