Line data Source code
1 : /*
2 : This file is part of TALER
3 : Copyright (C) 2022, 2023 Taler Systems SA
4 :
5 : TALER is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 3, or (at your option) any later version.
8 :
9 : TALER is distributed in the hope that it will be useful, but WITHOUT ANY
10 : WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
11 : A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 :
13 : You should have received a copy of the GNU General Public License along with
14 : TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
15 : */
16 : /**
17 : * @file exchangedb/pg_aggregate.c
18 : * @brief Implementation of the aggregate function for Postgres
19 : * @author Christian Grothoff
20 : */
21 : #include "taler/platform.h"
22 : #include "taler/taler_error_codes.h"
23 : #include "taler/taler_dbevents.h"
24 : #include "taler/taler_pq_lib.h"
25 : #include "pg_compute_shard.h"
26 : #include "pg_event_notify.h"
27 : #include "pg_aggregate.h"
28 : #include "pg_helper.h"
29 :
30 :
31 : enum GNUNET_DB_QueryStatus
32 68 : TEH_PG_aggregate (
33 : void *cls,
34 : const struct TALER_FullPaytoHashP *h_payto,
35 : const struct TALER_MerchantPublicKeyP *merchant_pub,
36 : const struct TALER_WireTransferIdentifierRawP *wtid,
37 : struct TALER_Amount *total)
38 : {
39 68 : struct PostgresClosure *pg = cls;
40 68 : uint64_t deposit_shard = TEH_PG_compute_shard (merchant_pub);
41 68 : struct GNUNET_TIME_Absolute now = {0};
42 : uint64_t sum_deposit_value;
43 : uint64_t sum_deposit_frac;
44 : uint64_t sum_refund_value;
45 : uint64_t sum_refund_frac;
46 : uint64_t sum_fee_value;
47 : uint64_t sum_fee_frac;
48 : enum GNUNET_DB_QueryStatus qs;
49 : struct TALER_Amount sum_deposit;
50 : struct TALER_Amount sum_refund;
51 : struct TALER_Amount sum_fee;
52 : struct TALER_Amount delta;
53 :
54 68 : now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
55 : pg->aggregator_shift);
56 68 : PREPARE (pg,
57 : "aggregate",
58 : "WITH bdep AS (" /* restrict to our merchant and account and mark as done */
59 : " UPDATE batch_deposits"
60 : " SET done=TRUE"
61 : " WHERE NOT (done OR policy_blocked)" /* only actually executable deposits */
62 : " AND refund_deadline<$1"
63 : " AND shard=$5" /* only for efficiency, merchant_pub is what we really filter by */
64 : " AND merchant_pub=$2" /* filter by target merchant */
65 : " AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */
66 : " RETURNING"
67 : " batch_deposit_serial_id)"
68 : " ,cdep AS ("
69 : " SELECT"
70 : " coin_deposit_serial_id"
71 : " ,batch_deposit_serial_id"
72 : " ,coin_pub"
73 : " ,amount_with_fee AS amount"
74 : " FROM coin_deposits"
75 : " WHERE batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))"
76 : " ,ref AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
77 : " SELECT"
78 : " amount_with_fee AS refund"
79 : " ,coin_pub"
80 : " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
81 : " FROM refunds"
82 : " WHERE coin_pub IN (SELECT coin_pub FROM cdep)"
83 : " AND batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))"
84 : " ,ref_by_coin AS (" /* total up refunds by coin */
85 : " SELECT"
86 : " SUM((ref.refund).val) AS sum_refund_val"
87 : " ,SUM((ref.refund).frac) AS sum_refund_frac"
88 : " ,coin_pub"
89 : " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
90 : " FROM ref"
91 : " GROUP BY coin_pub, batch_deposit_serial_id)"
92 : " ,norm_ref_by_coin AS (" /* normalize */
93 : " SELECT"
94 : " sum_refund_val + sum_refund_frac / 100000000 AS norm_refund_val"
95 : " ,sum_refund_frac % 100000000 AS norm_refund_frac"
96 : " ,coin_pub"
97 : " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
98 : " FROM ref_by_coin)"
99 : " ,fully_refunded_coins AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
100 : " SELECT"
101 : " cdep.coin_pub"
102 : " FROM norm_ref_by_coin norm"
103 : " JOIN cdep"
104 : " ON (norm.coin_pub = cdep.coin_pub"
105 : " AND norm.batch_deposit_serial_id = cdep.batch_deposit_serial_id"
106 : " AND norm.norm_refund_val = (cdep.amount).val"
107 : " AND norm.norm_refund_frac = (cdep.amount).frac))"
108 : " ,fees AS (" /* find deposit fees for not fully refunded deposits */
109 : " SELECT"
110 : " denom.fee_deposit AS fee"
111 : " ,cs.batch_deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */
112 : " FROM cdep cs"
113 : " JOIN known_coins kc" /* NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
114 : " USING (coin_pub)"
115 : " JOIN denominations denom"
116 : " USING (denominations_serial)"
117 : " WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins))"
118 : " ,dummy AS (" /* add deposits to aggregation_tracking */
119 : " INSERT INTO aggregation_tracking"
120 : " (batch_deposit_serial_id"
121 : " ,wtid_raw)"
122 : " SELECT batch_deposit_serial_id,$4"
123 : " FROM bdep)"
124 : "SELECT" /* calculate totals (deposits, refunds and fees) */
125 : " CAST(COALESCE(SUM((cdep.amount).val),0) AS INT8) AS sum_deposit_value"
126 : /* cast needed, otherwise we get NUMBER */
127 : " ,COALESCE(SUM((cdep.amount).frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */
128 : " ,CAST(COALESCE(SUM((ref.refund).val),0) AS INT8) AS sum_refund_value"
129 : " ,COALESCE(SUM((ref.refund).frac),0) AS sum_refund_fraction"
130 : " ,CAST(COALESCE(SUM((fees.fee).val),0) AS INT8) AS sum_fee_value"
131 : " ,COALESCE(SUM((fees.fee).frac),0) AS sum_fee_fraction"
132 : " FROM cdep "
133 : " FULL OUTER JOIN ref ON (FALSE)" /* We just want all sums */
134 : " FULL OUTER JOIN fees ON (FALSE);");
135 :
136 : {
137 68 : struct GNUNET_PQ_QueryParam params[] = {
138 68 : GNUNET_PQ_query_param_absolute_time (&now),
139 68 : GNUNET_PQ_query_param_auto_from_type (merchant_pub),
140 68 : GNUNET_PQ_query_param_auto_from_type (h_payto),
141 68 : GNUNET_PQ_query_param_auto_from_type (wtid),
142 68 : GNUNET_PQ_query_param_uint64 (&deposit_shard),
143 : GNUNET_PQ_query_param_end
144 : };
145 68 : struct GNUNET_PQ_ResultSpec rs[] = {
146 68 : GNUNET_PQ_result_spec_uint64 ("sum_deposit_value",
147 : &sum_deposit_value),
148 68 : GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction",
149 : &sum_deposit_frac),
150 68 : GNUNET_PQ_result_spec_uint64 ("sum_refund_value",
151 : &sum_refund_value),
152 68 : GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction",
153 : &sum_refund_frac),
154 68 : GNUNET_PQ_result_spec_uint64 ("sum_fee_value",
155 : &sum_fee_value),
156 68 : GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction",
157 : &sum_fee_frac),
158 : GNUNET_PQ_result_spec_end
159 : };
160 :
161 68 : qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
162 : "aggregate",
163 : params,
164 : rs);
165 : }
166 68 : if (qs < 0)
167 : {
168 0 : GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
169 0 : return qs;
170 : }
171 68 : if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
172 : {
173 0 : GNUNET_assert (GNUNET_OK ==
174 : TALER_amount_set_zero (pg->currency,
175 : total));
176 0 : return qs;
177 : }
178 68 : GNUNET_assert (GNUNET_OK ==
179 : TALER_amount_set_zero (pg->currency,
180 : &sum_deposit));
181 68 : GNUNET_assert (GNUNET_OK ==
182 : TALER_amount_set_zero (pg->currency,
183 : &sum_refund));
184 68 : GNUNET_assert (GNUNET_OK ==
185 : TALER_amount_set_zero (pg->currency,
186 : &sum_fee));
187 68 : sum_deposit.value = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE
188 68 : + sum_deposit_value;
189 68 : sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE;
190 68 : sum_refund.value = sum_refund_frac / TALER_AMOUNT_FRAC_BASE
191 68 : + sum_refund_value;
192 68 : sum_refund.fraction = sum_refund_frac % TALER_AMOUNT_FRAC_BASE;
193 68 : sum_fee.value = sum_fee_frac / TALER_AMOUNT_FRAC_BASE
194 68 : + sum_fee_value;
195 68 : sum_fee.fraction = sum_fee_frac % TALER_AMOUNT_FRAC_BASE; \
196 68 : GNUNET_assert (0 <=
197 : TALER_amount_subtract (&delta,
198 : &sum_deposit,
199 : &sum_refund));
200 68 : GNUNET_assert (0 <=
201 : TALER_amount_subtract (total,
202 : &delta,
203 : &sum_fee));
204 68 : return qs;
205 : }
|