LCOV - code coverage report
Current view: top level - exchangedb - pg_begin_shard.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 59 111 53.2 %
Date: 2025-06-05 21:03:14 Functions: 1 1 100.0 %

          Line data    Source code
       1             : /*
       2             :    This file is part of TALER
       3             :    Copyright (C) 2022 Taler Systems SA
       4             : 
       5             :    TALER is free software; you can redistribute it and/or modify it under the
       6             :    terms of the GNU General Public License as published by the Free Software
       7             :    Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
      12             : 
      13             :    You should have received a copy of the GNU General Public License along with
      14             :    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             :  */
      16             : /**
      17             :  * @file exchangedb/pg_begin_shard.c
      18             :  * @brief Implementation of the begin_shard function for Postgres
      19             :  * @author Christian Grothoff
      20             :  */
      21             : #include "platform.h"
      22             : #include "taler_error_codes.h"
      23             : #include "taler_dbevents.h"
      24             : #include "taler_pq_lib.h"
      25             : #include "pg_begin_shard.h"
      26             : #include "pg_helper.h"
      27             : #include "pg_start.h"
      28             : #include "pg_rollback.h"
      29             : #include "pg_commit.h"
      30             : 
      31             : 
      32             : enum GNUNET_DB_QueryStatus
      33         296 : TEH_PG_begin_shard (void *cls,
      34             :                     const char *job_name,
      35             :                     struct GNUNET_TIME_Relative delay,
      36             :                     uint64_t shard_size,
      37             :                     uint64_t *start_row,
      38             :                     uint64_t *end_row)
      39             : {
      40         296 :   struct PostgresClosure *pg = cls;
      41             : 
      42         296 :   for (unsigned int retries = 0; retries<10; retries++)
      43             :   {
      44         296 :     if (GNUNET_OK !=
      45         296 :         TEH_PG_start (pg,
      46             :                       "begin_shard"))
      47             :     {
      48           0 :       GNUNET_break (0);
      49           0 :       return GNUNET_DB_STATUS_HARD_ERROR;
      50             :     }
      51             : 
      52             :     {
      53             :       struct GNUNET_TIME_Absolute past;
      54             :       enum GNUNET_DB_QueryStatus qs;
      55         296 :       struct GNUNET_PQ_QueryParam params[] = {
      56         296 :         GNUNET_PQ_query_param_string (job_name),
      57         296 :         GNUNET_PQ_query_param_absolute_time (&past),
      58             :         GNUNET_PQ_query_param_end
      59             :       };
      60         296 :       struct GNUNET_PQ_ResultSpec rs[] = {
      61         296 :         GNUNET_PQ_result_spec_uint64 ("start_row",
      62             :                                       start_row),
      63         296 :         GNUNET_PQ_result_spec_uint64 ("end_row",
      64             :                                       end_row),
      65             :         GNUNET_PQ_result_spec_end
      66             :       };
      67             : 
      68         296 :       past = GNUNET_TIME_absolute_get ();
      69         296 :       PREPARE (pg,
      70             :                "get_open_shard",
      71             :                "SELECT"
      72             :                " start_row"
      73             :                ",end_row"
      74             :                " FROM work_shards"
      75             :                " WHERE job_name=$1"
      76             :                "   AND completed=FALSE"
      77             :                "   AND last_attempt<$2"
      78             :                " ORDER BY last_attempt ASC"
      79             :                " LIMIT 1;");
      80         296 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
      81             :                                                      "get_open_shard",
      82             :                                                      params,
      83             :                                                      rs);
      84         296 :       switch (qs)
      85             :       {
      86           0 :       case GNUNET_DB_STATUS_HARD_ERROR:
      87           0 :         GNUNET_break (0);
      88           0 :         TEH_PG_rollback (pg);
      89           0 :         return qs;
      90           0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
      91           0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
      92             :                     "Serialization error on getting open shard\n");
      93           0 :         TEH_PG_rollback (pg);
      94           0 :         continue;
      95          91 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
      96             :         {
      97             :           enum GNUNET_DB_QueryStatus qsz;
      98             :           struct GNUNET_TIME_Absolute now;
      99          91 :           struct GNUNET_PQ_QueryParam iparams[] = {
     100          91 :             GNUNET_PQ_query_param_string (job_name),
     101          91 :             GNUNET_PQ_query_param_absolute_time (&now),
     102          91 :             GNUNET_PQ_query_param_uint64 (start_row),
     103          91 :             GNUNET_PQ_query_param_uint64 (end_row),
     104             :             GNUNET_PQ_query_param_end
     105             :           };
     106             : 
     107          91 :           now = GNUNET_TIME_relative_to_absolute (delay);
     108          91 :           PREPARE (pg,
     109             :                    "reclaim_shard",
     110             :                    "UPDATE work_shards"
     111             :                    " SET last_attempt=$2"
     112             :                    " WHERE job_name=$1"
     113             :                    "   AND start_row=$3"
     114             :                    "   AND end_row=$4");
     115          91 :           qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     116             :                                                     "reclaim_shard",
     117             :                                                     iparams);
     118           0 :           switch (qsz)
     119             :           {
     120           0 :           case GNUNET_DB_STATUS_HARD_ERROR:
     121           0 :             GNUNET_break (0);
     122           0 :             TEH_PG_rollback (pg);
     123           0 :             return qsz;
     124           0 :           case GNUNET_DB_STATUS_SOFT_ERROR:
     125           0 :             GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     126             :                         "Serialization error on claiming open shard\n");
     127           0 :             TEH_PG_rollback (pg);
     128           0 :             continue;
     129          91 :           case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     130          91 :             goto commit;
     131           0 :           case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     132           0 :             GNUNET_break (0); /* logic error, should be impossible */
     133           0 :             TEH_PG_rollback (pg);
     134           0 :             return GNUNET_DB_STATUS_HARD_ERROR;
     135             :           }
     136             :         }
     137           0 :         break; /* actually unreachable */
     138         205 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     139         205 :         break; /* continued below */
     140             :       }
     141             :     } /* get_open_shard */
     142             : 
     143             :     /* No open shard, find last 'end_row' */
     144             :     {
     145             :       enum GNUNET_DB_QueryStatus qs;
     146         205 :       struct GNUNET_PQ_QueryParam params[] = {
     147         205 :         GNUNET_PQ_query_param_string (job_name),
     148             :         GNUNET_PQ_query_param_end
     149             :       };
     150         205 :       struct GNUNET_PQ_ResultSpec rs[] = {
     151         205 :         GNUNET_PQ_result_spec_uint64 ("end_row",
     152             :                                       start_row),
     153             :         GNUNET_PQ_result_spec_end
     154             :       };
     155             : 
     156         205 :       PREPARE (pg,
     157             :                "get_last_shard",
     158             :                "SELECT"
     159             :                " end_row"
     160             :                " FROM work_shards"
     161             :                " WHERE job_name=$1"
     162             :                " ORDER BY end_row DESC"
     163             :                " LIMIT 1;");
     164         205 :       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     165             :                                                      "get_last_shard",
     166             :                                                      params,
     167             :                                                      rs);
     168         205 :       switch (qs)
     169             :       {
     170           0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     171           0 :         GNUNET_break (0);
     172           0 :         TEH_PG_rollback (pg);
     173           0 :         return qs;
     174           0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     175           0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     176             :                     "Serialization error on getting last shard\n");
     177           0 :         TEH_PG_rollback (pg);
     178           0 :         continue;
     179         170 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     180         170 :         break;
     181          35 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     182          35 :         *start_row = 0; /* base-case: no shards yet */
     183          35 :         break; /* continued below */
     184             :       }
     185         205 :       *end_row = *start_row + shard_size;
     186             :     } /* get_last_shard */
     187             : 
     188             :     /* Claim fresh shard */
     189             :     {
     190             :       enum GNUNET_DB_QueryStatus qs;
     191             :       struct GNUNET_TIME_Absolute now;
     192         205 :       struct GNUNET_PQ_QueryParam params[] = {
     193         205 :         GNUNET_PQ_query_param_string (job_name),
     194         205 :         GNUNET_PQ_query_param_absolute_time (&now),
     195         205 :         GNUNET_PQ_query_param_uint64 (start_row),
     196         205 :         GNUNET_PQ_query_param_uint64 (end_row),
     197             :         GNUNET_PQ_query_param_end
     198             :       };
     199             : 
     200         205 :       now = GNUNET_TIME_relative_to_absolute (delay);
     201         205 :       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     202             :                   "Trying to claim shard (%llu-%llu]\n",
     203             :                   (unsigned long long) *start_row,
     204             :                   (unsigned long long) *end_row);
     205             : 
     206         205 :       PREPARE (pg,
     207             :                "claim_next_shard",
     208             :                "INSERT INTO work_shards"
     209             :                "(job_name"
     210             :                ",last_attempt"
     211             :                ",start_row"
     212             :                ",end_row"
     213             :                ") VALUES "
     214             :                "($1, $2, $3, $4);");
     215         205 :       qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
     216             :                                                "claim_next_shard",
     217             :                                                params);
     218         205 :       switch (qs)
     219             :       {
     220           0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     221           0 :         GNUNET_break (0);
     222           0 :         TEH_PG_rollback (pg);
     223           0 :         return qs;
     224           0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     225           0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     226             :                     "Serialization error on claiming next shard\n");
     227           0 :         TEH_PG_rollback (pg);
     228           0 :         continue;
     229         205 :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     230             :         /* continued below */
     231         205 :         break;
     232           0 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     233             :         /* someone else got this shard already,
     234             :            try again */
     235           0 :         TEH_PG_rollback (pg);
     236           0 :         continue;
     237             :       }
     238             :     } /* claim_next_shard */
     239             : 
     240             :     /* commit */
     241         296 : commit:
     242             :     {
     243             :       enum GNUNET_DB_QueryStatus qs;
     244             : 
     245         296 :       qs = TEH_PG_commit (pg);
     246         296 :       switch (qs)
     247             :       {
     248           0 :       case GNUNET_DB_STATUS_HARD_ERROR:
     249           0 :         GNUNET_break (0);
     250           0 :         TEH_PG_rollback (pg);
     251           0 :         return qs;
     252           0 :       case GNUNET_DB_STATUS_SOFT_ERROR:
     253           0 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     254             :                     "Serialization error on commit for beginning shard\n");
     255           0 :         TEH_PG_rollback (pg);
     256           0 :         continue;
     257         296 :       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     258             :       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     259         296 :         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     260             :                     "Claimed new shard\n");
     261         296 :         return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
     262             :       }
     263             :     }
     264             :   } /* retry 'for' loop */
     265           0 :   return GNUNET_DB_STATUS_SOFT_ERROR;
     266             : }

Generated by: LCOV version 1.16