Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2022, 2023 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 :
13 : You should have received a copy of the GNU General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file exchangedb/aggregate.c
18 : * @brief Implementation of the aggregate function for Postgres
19 : * @author Christian Grothoff
20 : */
21 : #include "taler/taler_error_codes.h"
22 : #include "taler/taler_pq_lib.h"
23 : #include "exchange-database/compute_shard.h"
24 : #include "exchange-database/aggregate.h"
25 : #include "helper.h"
26 :
27 :
28 : enum GNUNET_DB_QueryStatus
29 56 : TALER_EXCHANGEDB_aggregate (
30 : struct TALER_EXCHANGEDB_PostgresContext *pg,
31 : const struct TALER_FullPaytoHashP *h_payto,
32 : const struct TALER_MerchantPublicKeyP *merchant_pub,
33 : const struct TALER_WireTransferIdentifierRawP *wtid,
34 : struct TALER_Amount *total)
35 : {
36 56 : uint64_t deposit_shard = TALER_EXCHANGEDB_compute_shard (merchant_pub);
37 56 : struct GNUNET_TIME_Absolute now = {0};
38 : uint64_t sum_deposit_value;
39 : uint64_t sum_deposit_frac;
40 : uint64_t sum_refund_value;
41 : uint64_t sum_refund_frac;
42 : uint64_t sum_fee_value;
43 : uint64_t sum_fee_frac;
44 : enum GNUNET_DB_QueryStatus qs;
45 : struct TALER_Amount sum_deposit;
46 : struct TALER_Amount sum_refund;
47 : struct TALER_Amount sum_fee;
48 : struct TALER_Amount delta;
49 :
50 56 : now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
51 : pg->aggregator_shift);
52 56 : PREPARE (pg,
53 : "aggregate",
54 : "WITH bdep AS (" /* restrict to our merchant and account and mark as done */
55 : " UPDATE batch_deposits"
56 : " SET done=TRUE"
57 : " WHERE NOT (done OR policy_blocked)" /* only actually executable deposits */
58 : " AND refund_deadline<$1"
59 : " AND shard=$5" /* only for efficiency, merchant_pub is what we really filter by */
60 : " AND merchant_pub=$2" /* filter by target merchant */
61 : " AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */
62 : " RETURNING"
63 : " batch_deposit_serial_id)"
64 : " ,cdep AS ("
65 : " SELECT"
66 : " coin_deposit_serial_id"
67 : " ,batch_deposit_serial_id"
68 : " ,coin_pub"
69 : " ,amount_with_fee AS amount"
70 : " FROM coin_deposits"
71 : " WHERE batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))"
72 : " ,ref AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
73 : " SELECT"
74 : " amount_with_fee AS refund"
75 : " ,coin_pub"
76 : " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
77 : " FROM refunds"
78 : " WHERE coin_pub IN (SELECT coin_pub FROM cdep)"
79 : " AND batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))"
80 : " ,ref_by_coin AS (" /* total up refunds by coin */
81 : " SELECT"
82 : " SUM((ref.refund).val) AS sum_refund_val"
83 : " ,SUM((ref.refund).frac) AS sum_refund_frac"
84 : " ,coin_pub"
85 : " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
86 : " FROM ref"
87 : " GROUP BY coin_pub, batch_deposit_serial_id)"
88 : " ,norm_ref_by_coin AS (" /* normalize */
89 : " SELECT"
90 : " sum_refund_val + sum_refund_frac / 100000000 AS norm_refund_val"
91 : " ,sum_refund_frac % 100000000 AS norm_refund_frac"
92 : " ,coin_pub"
93 : " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
94 : " FROM ref_by_coin)"
95 : " ,fully_refunded_coins AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
96 : " SELECT"
97 : " cdep.coin_pub"
98 : " FROM norm_ref_by_coin norm"
99 : " JOIN cdep"
100 : " ON (norm.coin_pub = cdep.coin_pub"
101 : " AND norm.batch_deposit_serial_id = cdep.batch_deposit_serial_id"
102 : " AND norm.norm_refund_val = (cdep.amount).val"
103 : " AND norm.norm_refund_frac = (cdep.amount).frac))"
104 : " ,fees AS (" /* find deposit fees for not fully refunded deposits */
105 : " SELECT"
106 : " denom.fee_deposit AS fee"
107 : " ,cs.batch_deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */
108 : " FROM cdep cs"
109 : " JOIN known_coins kc" /* NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
110 : " USING (coin_pub)"
111 : " JOIN denominations denom"
112 : " USING (denominations_serial)"
113 : " WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins))"
114 : " ,dummy AS (" /* add deposits to aggregation_tracking */
115 : " INSERT INTO aggregation_tracking"
116 : " (batch_deposit_serial_id"
117 : " ,wtid_raw)"
118 : " SELECT batch_deposit_serial_id,$4"
119 : " FROM bdep)"
120 : "SELECT" /* calculate totals (deposits, refunds and fees) */
121 : " CAST(COALESCE(SUM((cdep.amount).val),0) AS INT8) AS sum_deposit_value"
122 : /* cast needed, otherwise we get NUMBER */
123 : " ,COALESCE(SUM((cdep.amount).frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */
124 : " ,CAST(COALESCE(SUM((ref.refund).val),0) AS INT8) AS sum_refund_value"
125 : " ,COALESCE(SUM((ref.refund).frac),0) AS sum_refund_fraction"
126 : " ,CAST(COALESCE(SUM((fees.fee).val),0) AS INT8) AS sum_fee_value"
127 : " ,COALESCE(SUM((fees.fee).frac),0) AS sum_fee_fraction"
128 : " FROM cdep "
129 : " FULL OUTER JOIN ref ON (FALSE)" /* We just want all sums */
130 : " FULL OUTER JOIN fees ON (FALSE);");
131 :
132 : {
133 56 : struct GNUNET_PQ_QueryParam params[] = {
134 56 : GNUNET_PQ_query_param_absolute_time (&now),
135 56 : GNUNET_PQ_query_param_auto_from_type (merchant_pub),
136 56 : GNUNET_PQ_query_param_auto_from_type (h_payto),
137 56 : GNUNET_PQ_query_param_auto_from_type (wtid),
138 56 : GNUNET_PQ_query_param_uint64 (&deposit_shard),
139 : GNUNET_PQ_query_param_end
140 : };
141 56 : struct GNUNET_PQ_ResultSpec rs[] = {
142 56 : GNUNET_PQ_result_spec_uint64 ("sum_deposit_value",
143 : &sum_deposit_value),
144 56 : GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction",
145 : &sum_deposit_frac),
146 56 : GNUNET_PQ_result_spec_uint64 ("sum_refund_value",
147 : &sum_refund_value),
148 56 : GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction",
149 : &sum_refund_frac),
150 56 : GNUNET_PQ_result_spec_uint64 ("sum_fee_value",
151 : &sum_fee_value),
152 56 : GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction",
153 : &sum_fee_frac),
154 : GNUNET_PQ_result_spec_end
155 : };
156 :
157 56 : qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
158 : "aggregate",
159 : params,
160 : rs);
161 : }
162 56 : if (qs < 0)
163 : {
164 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
165 0 : return qs;
166 : }
167 56 : if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
168 : {
169 0 : GNUNET_assert (GNUNET_OK ==
170 : TALER_amount_set_zero (pg->currency,
171 : total));
172 0 : return qs;
173 : }
174 56 : GNUNET_assert (GNUNET_OK ==
175 : TALER_amount_set_zero (pg->currency,
176 : &sum_deposit));
177 56 : GNUNET_assert (GNUNET_OK ==
178 : TALER_amount_set_zero (pg->currency,
179 : &sum_refund));
180 56 : GNUNET_assert (GNUNET_OK ==
181 : TALER_amount_set_zero (pg->currency,
182 : &sum_fee));
183 56 : sum_deposit.value = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE
184 56 : + sum_deposit_value;
185 56 : sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE;
186 56 : sum_refund.value = sum_refund_frac / TALER_AMOUNT_FRAC_BASE
187 56 : + sum_refund_value;
188 56 : sum_refund.fraction = sum_refund_frac % TALER_AMOUNT_FRAC_BASE;
189 56 : sum_fee.value = sum_fee_frac / TALER_AMOUNT_FRAC_BASE
190 56 : + sum_fee_value;
191 56 : sum_fee.fraction = sum_fee_frac % TALER_AMOUNT_FRAC_BASE; \
192 56 : GNUNET_assert (0 <=
193 : TALER_amount_subtract (&delta,
194 : &sum_deposit,
195 : &sum_refund));
196 56 : GNUNET_assert (0 <=
197 : TALER_amount_subtract (total,
198 : &delta,
199 : &sum_fee));
200 56 : return qs;
201 : }
|