LCOV - code coverage report
Current view: top level - exchangedb - pg_aggregate.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 37 41 90.2 %
Date: 2025-06-22 12:09:43 Functions: 1 1 100.0 %

          Line data    Source code
       1             : /*
       2             :    This file is part of TALER
       3             :    Copyright (C) 2022, 2023 Taler Systems SA
       4             : 
       5             :    TALER is free software; you can redistribute it and/or modify it under the
       6             :    terms of the GNU General Public License as published by the Free Software
       7             :    Foundation; either version 3, or (at your option) any later version.
       8             : 
       9             :    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
      10             :    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
      11             :    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
      12             : 
      13             :    You should have received a copy of the GNU General Public License along with
      14             :    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
      15             :  */
      16             : /**
      17             :  * @file exchangedb/pg_aggregate.c
      18             :  * @brief Implementation of the aggregate function for Postgres
      19             :  * @author Christian Grothoff
      20             :  */
      21             : #include "taler/platform.h"
      22             : #include "taler/taler_error_codes.h"
      23             : #include "taler/taler_dbevents.h"
      24             : #include "taler/taler_pq_lib.h"
      25             : #include "pg_compute_shard.h"
      26             : #include "pg_event_notify.h"
      27             : #include "pg_aggregate.h"
      28             : #include "pg_helper.h"
      29             : 
      30             : 
      31             : enum GNUNET_DB_QueryStatus
      32          68 : TEH_PG_aggregate (
      33             :   void *cls,
      34             :   const struct TALER_FullPaytoHashP *h_payto,
      35             :   const struct TALER_MerchantPublicKeyP *merchant_pub,
      36             :   const struct TALER_WireTransferIdentifierRawP *wtid,
      37             :   struct TALER_Amount *total)
      38             : {
      39          68 :   struct PostgresClosure *pg = cls;
      40          68 :   uint64_t deposit_shard = TEH_PG_compute_shard (merchant_pub);
      41          68 :   struct GNUNET_TIME_Absolute now = {0};
      42             :   uint64_t sum_deposit_value;
      43             :   uint64_t sum_deposit_frac;
      44             :   uint64_t sum_refund_value;
      45             :   uint64_t sum_refund_frac;
      46             :   uint64_t sum_fee_value;
      47             :   uint64_t sum_fee_frac;
      48             :   enum GNUNET_DB_QueryStatus qs;
      49             :   struct TALER_Amount sum_deposit;
      50             :   struct TALER_Amount sum_refund;
      51             :   struct TALER_Amount sum_fee;
      52             :   struct TALER_Amount delta;
      53             : 
      54          68 :   now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
      55             :                                          pg->aggregator_shift);
      56          68 :   PREPARE (pg,
      57             :            "aggregate",
      58             :            "WITH bdep AS (" /* restrict to our merchant and account and mark as done */
      59             :            "  UPDATE batch_deposits"
      60             :            "     SET done=TRUE"
      61             :            "   WHERE NOT (done OR policy_blocked)" /* only actually executable deposits */
      62             :            "     AND refund_deadline<$1"
      63             :            "     AND shard=$5" /* only for efficiency, merchant_pub is what we really filter by */
      64             :            "     AND merchant_pub=$2" /* filter by target merchant */
      65             :            "     AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */
      66             :            "   RETURNING"
      67             :            "     batch_deposit_serial_id)"
      68             :            " ,cdep AS ("
      69             :            "   SELECT"
      70             :            "     coin_deposit_serial_id"
      71             :            "    ,batch_deposit_serial_id"
      72             :            "    ,coin_pub"
      73             :            "    ,amount_with_fee AS amount"
      74             :            "   FROM coin_deposits"
      75             :            "   WHERE batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))"
      76             :            " ,ref AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
      77             :            "  SELECT"
      78             :            "    amount_with_fee AS refund"
      79             :            "   ,coin_pub"
      80             :            "   ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
      81             :            "    FROM refunds"
      82             :            "   WHERE coin_pub IN (SELECT coin_pub FROM cdep)"
      83             :            "     AND batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))"
      84             :            " ,ref_by_coin AS (" /* total up refunds by coin */
      85             :            "  SELECT"
      86             :            "    SUM((ref.refund).val) AS sum_refund_val"
      87             :            "   ,SUM((ref.refund).frac) AS sum_refund_frac"
      88             :            "   ,coin_pub"
      89             :            "   ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
      90             :            "    FROM ref"
      91             :            "   GROUP BY coin_pub, batch_deposit_serial_id)"
      92             :            " ,norm_ref_by_coin AS (" /* normalize */
      93             :            "  SELECT"
      94             :            "    sum_refund_val + sum_refund_frac / 100000000 AS norm_refund_val"
      95             :            "   ,sum_refund_frac % 100000000 AS norm_refund_frac"
      96             :            "   ,coin_pub"
      97             :            "   ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
      98             :            "    FROM ref_by_coin)"
      99             :            " ,fully_refunded_coins AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
     100             :            "  SELECT"
     101             :            "    cdep.coin_pub"
     102             :            "    FROM norm_ref_by_coin norm"
     103             :            "    JOIN cdep"
     104             :            "      ON (norm.coin_pub = cdep.coin_pub"
     105             :            "      AND norm.batch_deposit_serial_id = cdep.batch_deposit_serial_id"
     106             :            "      AND norm.norm_refund_val = (cdep.amount).val"
     107             :            "      AND norm.norm_refund_frac = (cdep.amount).frac))"
     108             :            " ,fees AS (" /* find deposit fees for not fully refunded deposits */
     109             :            "  SELECT"
     110             :            "    denom.fee_deposit AS fee"
     111             :            "   ,cs.batch_deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */
     112             :            "    FROM cdep cs"
     113             :            "    JOIN known_coins kc" /* NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
     114             :            "      USING (coin_pub)"
     115             :            "    JOIN denominations denom"
     116             :            "      USING (denominations_serial)"
     117             :            "    WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins))"
     118             :            " ,dummy AS (" /* add deposits to aggregation_tracking */
     119             :            "    INSERT INTO aggregation_tracking"
     120             :            "    (batch_deposit_serial_id"
     121             :            "    ,wtid_raw)"
     122             :            "    SELECT batch_deposit_serial_id,$4"
     123             :            "      FROM bdep)"
     124             :            "SELECT" /* calculate totals (deposits, refunds and fees) */
     125             :            "  CAST(COALESCE(SUM((cdep.amount).val),0) AS INT8) AS sum_deposit_value"
     126             :            /* cast needed, otherwise we get NUMBER */
     127             :            " ,COALESCE(SUM((cdep.amount).frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */
     128             :            " ,CAST(COALESCE(SUM((ref.refund).val),0) AS INT8) AS sum_refund_value"
     129             :            " ,COALESCE(SUM((ref.refund).frac),0) AS sum_refund_fraction"
     130             :            " ,CAST(COALESCE(SUM((fees.fee).val),0) AS INT8) AS sum_fee_value"
     131             :            " ,COALESCE(SUM((fees.fee).frac),0) AS sum_fee_fraction"
     132             :            " FROM cdep "
     133             :            "   FULL OUTER JOIN ref ON (FALSE)"    /* We just want all sums */
     134             :            "   FULL OUTER JOIN fees ON (FALSE);");
     135             : 
     136             :   {
     137          68 :     struct GNUNET_PQ_QueryParam params[] = {
     138          68 :       GNUNET_PQ_query_param_absolute_time (&now),
     139          68 :       GNUNET_PQ_query_param_auto_from_type (merchant_pub),
     140          68 :       GNUNET_PQ_query_param_auto_from_type (h_payto),
     141          68 :       GNUNET_PQ_query_param_auto_from_type (wtid),
     142          68 :       GNUNET_PQ_query_param_uint64 (&deposit_shard),
     143             :       GNUNET_PQ_query_param_end
     144             :     };
     145          68 :     struct GNUNET_PQ_ResultSpec rs[] = {
     146          68 :       GNUNET_PQ_result_spec_uint64 ("sum_deposit_value",
     147             :                                     &sum_deposit_value),
     148          68 :       GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction",
     149             :                                     &sum_deposit_frac),
     150          68 :       GNUNET_PQ_result_spec_uint64 ("sum_refund_value",
     151             :                                     &sum_refund_value),
     152          68 :       GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction",
     153             :                                     &sum_refund_frac),
     154          68 :       GNUNET_PQ_result_spec_uint64 ("sum_fee_value",
     155             :                                     &sum_fee_value),
     156          68 :       GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction",
     157             :                                     &sum_fee_frac),
     158             :       GNUNET_PQ_result_spec_end
     159             :     };
     160             : 
     161          68 :     qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     162             :                                                    "aggregate",
     163             :                                                    params,
     164             :                                                    rs);
     165             :   }
     166          68 :   if (qs < 0)
     167             :   {
     168           0 :     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     169           0 :     return qs;
     170             :   }
     171          68 :   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
     172             :   {
     173           0 :     GNUNET_assert (GNUNET_OK ==
     174             :                    TALER_amount_set_zero (pg->currency,
     175             :                                           total));
     176           0 :     return qs;
     177             :   }
     178          68 :   GNUNET_assert (GNUNET_OK ==
     179             :                  TALER_amount_set_zero (pg->currency,
     180             :                                         &sum_deposit));
     181          68 :   GNUNET_assert (GNUNET_OK ==
     182             :                  TALER_amount_set_zero (pg->currency,
     183             :                                         &sum_refund));
     184          68 :   GNUNET_assert (GNUNET_OK ==
     185             :                  TALER_amount_set_zero (pg->currency,
     186             :                                         &sum_fee));
     187          68 :   sum_deposit.value    = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE
     188          68 :                          + sum_deposit_value;
     189          68 :   sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE;
     190          68 :   sum_refund.value     = sum_refund_frac  / TALER_AMOUNT_FRAC_BASE
     191          68 :                          + sum_refund_value;
     192          68 :   sum_refund.fraction  = sum_refund_frac  % TALER_AMOUNT_FRAC_BASE;
     193          68 :   sum_fee.value        = sum_fee_frac     / TALER_AMOUNT_FRAC_BASE
     194          68 :                          + sum_fee_value;
     195          68 :   sum_fee.fraction     = sum_fee_frac     % TALER_AMOUNT_FRAC_BASE; \
     196          68 :   GNUNET_assert (0 <=
     197             :                  TALER_amount_subtract (&delta,
     198             :                                         &sum_deposit,
     199             :                                         &sum_refund));
     200          68 :   GNUNET_assert (0 <=
     201             :                  TALER_amount_subtract (total,
     202             :                                         &delta,
     203             :                                         &sum_fee));
     204          68 :   return qs;
     205             : }

Generated by: LCOV version 1.16