· 6 years ago · Jul 11, 2019, 03:18 PM
1namespace Hubtel.UnifiedSales.Payments
2{
3 public class PaymentConsumer : IPaymentConsumer
4 {
5 private readonly IOptions<DatabaseConnectionString> _config;
6 private readonly ILogger _logger;
7 private readonly IMultiProducer _producer;
8 private readonly IErrorProducer _errorProducer;
9 private readonly IStatsDPublisher _metrics;
10 private ConcurrentQueue<PaymentReceivedMessage> _listOfPayments = new ConcurrentQueue<PaymentReceivedMessage>();
11
12 public PaymentConsumer(IOptions<DatabaseConnectionString> config,
13 ILogger<PaymentConsumer> logger,
14 IMultiProducer producer,
15 IErrorProducer errorProducer,
16 IStatsDPublisher metrics,
17 IOptions<MessageGatheringInterval> intervalMilliseconds)
18 {
19 var timer = new System.Timers.Timer
20 {
21 Interval = intervalMilliseconds.Value.IntervalMilliseconds
22 };
23 timer.Elapsed += Timer_Elapsed;
24 timer.Enabled = true;
25 _producer = producer;
26 _errorProducer = errorProducer;
27 _config = config;
28 _logger = logger;
29 _metrics = metrics;
30 }
31
32 private void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
33 {
34 if (_listOfPayments.Count > 0)
35 {
36 try
37 {
38 _logger.LogInformation($"{_listOfPayments.Count} number gathered");
39 var itemCopy = new List<PaymentReceivedMessage>();
40 var cnt = _listOfPayments.Count;
41 for (int i = 0; i < cnt; i++)
42 {
43 if (_listOfPayments.TryDequeue(out PaymentReceivedMessage item))
44 itemCopy.Add(item);
45 }
46 _metrics.Gauge(Process.GetCurrentProcess().WorkingSet64 / 1024.0,"memoryUsage");
47 ProcessCallbackPayment(itemCopy);
48 }
49 catch (Exception ex)
50 {
51 _logger.LogError("Timer has an issue or unhandled " +
52 $"problems inner function calls: {ex.Message} ");
53 _metrics.Gauge(DateTime.UtcNow.ToOADate(),"errorTime");
54 }
55 }
56 }
57
58 public async Task Message(string message)
59 {
60 try
61 {
62 var paymentReceivedMessage = JsonConvert.DeserializeObject<PaymentReceivedMessage>(message);
63 if (!(paymentReceivedMessage is null))
64 {
65 _listOfPayments.Enqueue(paymentReceivedMessage);
66 }
67 }
68 catch (Exception e)
69 {
70 _logger.LogError("Received message couldn't be forwarded for processing", e);
71 }
72 }
73
74
75 private void UpdateOrderStatuses(List<Payment> payments)
76 {
77 _logger.LogInformation($"About to update order status for payments");
78 if (payments.Count == 0)
79 {
80 return;
81 }
82
83 var universalScheduleDate = DateTime.SpecifyKind(
84 DateTime.Parse(DateTime.UtcNow.ToString("h:mm:ss tt zz")), DateTimeKind.Utc);
85 var timeConditionalQuery = string.Empty;
86 if (universalScheduleDate.Hour == 0 && universalScheduleDate.Minute <= 10)
87 {
88 var dateTime = DateTime.UtcNow;
89
90 timeConditionalQuery =
91 $"AND O.\"CreatedAt\" >= '{dateTime.AddDays(-1).StartOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'" +
92 $"::timestamp without time zone AND O.\"CreatedAt\" <= " +
93 $"'{dateTime.EndOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'" +
94 $"::timestamp without time zone";
95 }
96 else
97 {
98 var dateTime = DateTime.UtcNow;
99 timeConditionalQuery =
100 $"AND O.\"CreatedAt\" >= '{dateTime.StartOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone AND O.\"CreatedAt\" <= " +
101 $"'{dateTime.EndOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone";
102 }
103
104 //get records here update and put inside temp table
105 using (var conn = new NpgsqlConnection(_config.Value.WriterConnection))
106 {
107 conn.Open();
108 try
109 {
110 using (var cmd = new NpgsqlCommand())
111 {
112 cmd.Connection = conn;
113 cmd.CommandText =
114 $"CREATE TEMPORARY TABLE IF NOT EXISTS \"TempPayments_{UniqueAppId.GetId()}\"" +
115 $"(\"OrderId\" character varying(256),\"AmountPaid\" numeric(20,4),\"IsSuccessful\" boolean NOT NULL);";
116 cmd.ExecuteNonQuery();
117 }
118
119 using (var bulkImporter = conn.BeginBinaryImport(
120 $"COPY \"TempPayments_{UniqueAppId.GetId()}\" (\"OrderId\", \"AmountPaid\", \"IsSuccessful\") FROM STDIN (FORMAT BINARY)")
121 )
122 {
123 foreach (var record in payments)
124 {
125 bulkImporter.StartRow();
126 bulkImporter.Write(record.OrderId, NpgsqlDbType.Varchar);
127 bulkImporter.Write(record.AmountPaid, NpgsqlDbType.Numeric);
128 bulkImporter.Write(record.IsSuccessful, NpgsqlDbType.Boolean);
129 }
130
131 bulkImporter.Complete();
132 }
133
134 using (var cmd = new NpgsqlCommand())
135 {
136 cmd.Connection = conn;
137 cmd.CommandText =
138 $"UPDATE \"Orders\" O SET \"AmountPaid\" = CASE WHEN TP.\"IsSuccessful\"=true " +
139 $"THEN TP.\"AmountPaid\" ELSE 0 END, \"Status\" = CASE WHEN TP.\"IsSuccessful\"=true THEN " +
140 $"'Paid' ELSE 'Unpaid' END FROM \"TempPayments_{UniqueAppId.GetId()}\" TP " +
141 $"WHERE O.\"Id\" = TP.\"OrderId\" {timeConditionalQuery};";
142 cmd.ExecuteNonQuery();
143 }
144 }
145 catch (Exception ex)
146 {
147 _logger.LogError($"Couldn't update order statuses {ex.Message}:", ex);
148 }
149
150 conn.Close();
151 }
152
153 //update with table
154 }
155
156 private void ProcessCallbackPayment(List<PaymentReceivedMessage> payments)
157 {
158 _logger.LogInformation($"About to update order status for payments");
159
160 using (_metrics.StartTimer("graphiteStats.calculationtime"))
161 {
162 if (payments.Count == 0)
163 {
164 return;
165 }
166
167 var universalScheduleDate = DateTime.SpecifyKind(
168 DateTime.Parse(DateTime.UtcNow.ToString("h:mm:ss tt zz")), DateTimeKind.Utc);
169 var timeConditionalQuery = string.Empty;
170 if (universalScheduleDate.Hour == 0 && universalScheduleDate.Minute <= 10)
171 {
172 var dateTime = DateTime.UtcNow;
173
174 timeConditionalQuery =
175 $"AND P.\"CreatedAt\" >= '{dateTime.AddDays(-1).StartOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'" +
176 $"::timestamp without time zone AND P.\"CreatedAt\" <= " +
177 $"'{dateTime.EndOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone";
178 }
179 else
180 {
181 var dateTime = DateTime.UtcNow;
182 timeConditionalQuery =
183 $"AND P.\"CreatedAt\" >= '{dateTime.StartOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'" +
184 $"::timestamp without time zone AND P.\"CreatedAt\" <= " +
185 $"'{dateTime.EndOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone";
186 }
187
188 //get records here update and put inside temp table
189 using (var conn = new NpgsqlConnection(_config.Value.WriterConnection))
190 {
191 conn.Open();
192 var newPayments = new List<Payment>();
193
194 try
195 {
196 using (var cmd = new NpgsqlCommand())
197 {
198 cmd.Connection = conn;
199 cmd.CommandText =
200 $"CREATE TEMPORARY TABLE IF NOT EXISTS \"TempPayments_{UniqueAppId.GetId()}\"" +
201 $"(\"HubtelReference\" character varying(15), \"StatusCode\" text,\"ProviderDescription\" text, " +
202 $"\"UpdatedAt\" timestamp , \"IsSuccessful\" boolean NULL);";
203 cmd.ExecuteNonQuery();
204
205 using (var bulkImporter = conn.BeginBinaryImport(
206 $"COPY \"TempPayments_{UniqueAppId.GetId()}\" (\"HubtelReference\", \"StatusCode\"," +
207 $" \"ProviderDescription\",\"UpdatedAt\",\"IsSuccessful\") FROM STDIN (FORMAT BINARY)"))
208 {
209 foreach (var record in payments)
210 {
211 bulkImporter.StartRow();
212 bulkImporter.Write(record.HubtelReference, NpgsqlDbType.Varchar);
213 bulkImporter.Write(record.StatusCode, NpgsqlDbType.Text);
214 bulkImporter.Write(record.ProviderDescription, NpgsqlDbType.Text);
215 bulkImporter.Write(record.PaymentDate, NpgsqlDbType.Timestamp);
216 bulkImporter.Write(record.IsSuccessful, NpgsqlDbType.Boolean);
217 }
218
219 bulkImporter.Complete();
220 }
221
222 cmd.Connection = conn;
223 cmd.CommandText = $"UPDATE \"Payments\" P SET \"StatusCode\" = TP.\"StatusCode\"" +
224 $" ,\"ProviderDescription\" = TP.\"ProviderDescription\"" +
225 $" , \"UpdatedAt\" = TP.\"UpdatedAt\"" +
226 $" , \"IsSuccessful\" = TP.\"IsSuccessful\"" +
227 $" FROM \"TempPayments_{UniqueAppId.GetId()}\" " +
228 $"TP WHERE P.\"HubtelReference\" = TP.\"HubtelReference\" {timeConditionalQuery} RETURNING *;";
229
230 using (var reader = cmd.ExecuteReader())
231 {
232 while (reader.Read())
233 {
234 var paymentRecord = new Payment
235 {
236 Id = reader.GetString(0),
237 BusinessId = reader.IsDBNull(1) ? string.Empty : reader.GetString(1),
238 CreatedAt = reader.GetDateTime(2),
239 UpdatedAt = reader.GetDateTime(3),
240 CreatedBy = reader.IsDBNull(4) ? string.Empty : reader.GetString(4),
241 UpdatedBy = reader.IsDBNull(5) ? string.Empty : reader.GetString(5),
242 PaymentType = reader.IsDBNull(6) ? string.Empty : reader.GetString(6),
243 OrderId = reader.IsDBNull(7) ? string.Empty : reader.GetString(7),
244 MomoPhoneNumber = reader.IsDBNull(8) ? string.Empty : reader.GetString(8),
245 MomoChannel = reader.IsDBNull(9) ? string.Empty : reader.GetString(9),
246 MomoToken = reader.IsDBNull(10) ? string.Empty : reader.GetString(10),
247 TransactionId = reader.IsDBNull(11) ? string.Empty : reader.GetString(11),
248 ExternalTransactionId =
249 reader.IsDBNull(12) ? string.Empty : reader.GetString(12),
250 AmountAfterCharges = reader.GetDecimal(13),
251 Charges = reader.GetDecimal(14),
252 ChargeCustomer = reader.GetBoolean(15),
253 AmountPaid = reader.GetDecimal(16),
254 PaymentDate = reader.GetDateTime(17),
255 Note = reader.IsDBNull(18) ? string.Empty : reader.GetString(18),
256 Description = reader.IsDBNull(19) ? string.Empty : reader.GetString(19),
257 PosDeviceId = reader.IsDBNull(20) ? string.Empty : reader.GetString(20),
258 PosDeviceType = reader.IsDBNull(21) ? string.Empty : reader.GetString(21),
259 EmployeeId = reader.IsDBNull(22) ? string.Empty : reader.GetString(22),
260 EmployeeName = reader.IsDBNull(23) ? string.Empty : reader.GetString(23),
261 CustomerMobileNumber =
262 reader.IsDBNull(24) ? string.Empty : reader.GetString(24),
263 CustomerName = reader.IsDBNull(25) ? string.Empty : reader.GetString(25),
264 BranchId = reader.IsDBNull(26) ? string.Empty : reader.GetString(26),
265 BranchName = reader.IsDBNull(27) ? string.Empty : reader.GetString(27),
266 IsRefund = reader.GetBoolean(28),
267 IsSuccessful = reader.GetBoolean(29),
268 ReceiptNumber = reader.IsDBNull(30) ? string.Empty : reader.GetString(30),
269 Location = reader.IsDBNull(31) ? string.Empty : reader.GetString(31),
270 Currency = reader.IsDBNull(32) ? string.Empty : reader.GetString(32),
271 Scheme = reader.IsDBNull(33) ? string.Empty : reader.GetString(33),
272 Card = reader.IsDBNull(34) ? string.Empty : reader.GetString(34),
273 Tid = reader.IsDBNull(35) ? string.Empty : reader.GetString(35),
274 Authorization = reader.IsDBNull(36) ? string.Empty : reader.GetString(36),
275 Mid = reader.IsDBNull(37) ? string.Empty : reader.GetString(37),
276 CardTransactionId = reader.IsDBNull(38) ? string.Empty : reader.GetString(38),
277 AmountTendered = reader.GetDecimal(39),
278 Balance = reader.GetDecimal(40),
279 ClientReference = reader.IsDBNull(41) ? string.Empty : reader.GetString(41),
280 ProviderDescription = reader.IsDBNull(42) ? string.Empty : reader.GetString(42),
281 StatusCode = reader.IsDBNull(43) ? string.Empty : reader.GetString(43),
282 FineractSavingsAccountId = reader.GetInt32(44),
283 CardProcessor = reader.IsDBNull(45) ? string.Empty : reader.GetString(45),
284 CardTransactionMode = reader.IsDBNull(46) ? string.Empty : reader.GetString(46),
285 HubtelReference = reader.IsDBNull(47) ? string.Empty : reader.GetString(47)
286 };
287
288 newPayments.Add(paymentRecord);
289 }
290 }
291 }
292 }
293 catch (Exception ex)
294 {
295 _logger.LogError($"Couldn't update payment statuses: {ex.Message}:", ex);
296 }
297
298 conn.Close();
299 UpdateOrderStatuses(newPayments);
300 FetchOrdersAndPushToKafka(newPayments);
301 }
302 }
303 }
304
305 private void FetchOrdersAndPushToKafka(List<Payment> payments)
306 {
307 _logger.LogInformation($"About to fetch orders to push to kafka");
308 var universalScheduleDate = DateTime.SpecifyKind(
309 DateTime.Parse(DateTime.UtcNow.ToString("h:mm:ss tt zz")), DateTimeKind.Utc);
310 var timeConditionalQuery = string.Empty;
311 if (universalScheduleDate.Hour == 0 && universalScheduleDate.Minute <= 10)
312 {
313 var dateTime = DateTime.UtcNow;
314
315 timeConditionalQuery =
316 $"AND O.\"CreatedAt\" >= '{dateTime.AddDays(-1).StartOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone AND O.\"CreatedAt\" <= " +
317 $"'{dateTime.EndOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone"
318 + $" AND OI.\"CreatedAt\" >= '{dateTime.AddDays(-1).StartOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone AND OI.\"CreatedAt\" <= " +
319 $"'{dateTime.EndOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone";
320 }
321 else
322 {
323 var dateTime = DateTime.UtcNow;
324 timeConditionalQuery =
325 $"AND O.\"CreatedAt\" >= '{dateTime.StartOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone AND O.\"CreatedAt\" <= " +
326 $"'{dateTime.EndOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone"
327 + $" AND OI.\"CreatedAt\" >= '{dateTime.StartOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone AND OI.\"CreatedAt\" <= " +
328 $"'{dateTime.EndOfDay().ToString("yyyy-MM-dd HH:mm:ss")}'::timestamp without time zone";
329 }
330
331
332 _logger.LogInformation($"Query conditional transformed to {timeConditionalQuery}");
333 if (payments.Count == 0)
334 {
335 return;
336 }
337
338 Dictionary<string, Order> orders = new Dictionary<string, Order>();
339
340 var orderIds = string.Join(",", payments.Select(x => $"('{x.OrderId}')").ToList());
341 using (var conn = new NpgsqlConnection(_config.Value.ReaderConnection))
342 {
343 conn.Open();
344 try
345 {
346 using (var cmd = new NpgsqlCommand())
347 {
348 cmd.Connection = conn;
349 cmd.CommandText =
350 "SELECT O.\"Id\", O.\"BusinessId\", coalesce(\"IntegrationChannel\",''), \"OrderDate\", coalesce(\"OrderNumber\",''), coalesce(O.\"Note\",''), " +
351 "coalesce(O.\"Description\",''), coalesce(\"Status\",''), coalesce(\"EmployeeId\",''), coalesce(\"EmployeeName\",''), coalesce(\"CustomerMobileNumber\",''), coalesce(\"CustomerName\",''), coalesce(\"BranchId\",''), " +
352 "coalesce(\"BranchName\",''), coalesce(O.\"TaxRate\", 0), coalesce(O.\"TaxAmount\", 0), coalesce(O.\"DiscountRate\", 0), coalesce(O.\"DiscountAmount\", 0), coalesce(\"Subtotal\", 0), coalesce(\"TotalAmountDue\", 0), " +
353 "coalesce(\"AmountPaid\", 0), coalesce(\"AmountRefunded\", 0), coalesce(\"PaymentTypes\",''), coalesce(\"Location\",''), coalesce(\"Currency\",''), \"IsFulfilled\", " +
354 "coalesce(\"ConsumerFeedback\",''), coalesce(\"ConsumerRating\", 0), coalesce(\"BusinessEmail\",''), coalesce(\"BusinessMobileNumber\",''), coalesce(\"CustomerEmail\",''), coalesce(\"FcmCustomer\",''), " +
355 "coalesce(\"FcmDevice\",''), \"AmountDueProducer\", \"DeliveryFee\", \"HasDelivery\", coalesce(\"BranchEmail\",''), coalesce(\"BranchPhoneNumber\",''), coalesce(\"BusinessName\",''), " +
356 $"\"CustomerReward\", coalesce(\"SenderId\",''), coalesce(OI.\"ItemId\",''), coalesce(OI.\"Name\",''), coalesce(OI.\"Quantity\", 0), coalesce(OI.\"ServiceRequestId\",'') FROM public.\"Orders\" O " +
357 $"JOIN public.\"OrderItems\" OI ON O.\"Id\"=OI.\"OrderId\" WHERE O.\"Id\" = ANY(VALUES" +
358 orderIds + ") " +
359 $"AND O.\"Status\"='Paid' {timeConditionalQuery} ; ";
360 _logger.LogInformation(cmd.CommandText);
361 using (var reader = cmd.ExecuteReader())
362 {
363 while (reader.Read())
364 {
365 var orderId = reader.GetString(0);
366
367 if (orders.ContainsKey(orderId))
368 {
369 var orderItem = new OrderItem();
370 orderItem.ItemId = reader.GetString(41);
371 orderItem.Name = reader.GetString(42);
372 orderItem.Quantity = reader.GetInt32(43);
373 orderItem.ServiceRequestId = reader.GetString(44);
374
375 orders[orderId].OrderItems.Add(orderItem);
376 }
377 else
378 {
379 var order = new Order();
380 order.Id = orderId;
381 order.BusinessId = reader.GetString(1);
382 order.IntegrationChannel = reader.GetString(2);
383 order.OrderDate = reader.GetDateTime(3);
384 order.OrderNumber = reader.GetString(4);
385 order.Note = reader.GetString(5);
386 order.Description = reader.GetString(6);
387 order.Status = reader.GetString(7);
388 order.EmployeeId = reader.GetString(8);
389 order.EmployeeName = reader.GetString(9);
390 order.CustomerMobileNumber = reader.GetString(10);
391 order.CustomerName = reader.GetString(11);
392 order.BranchId = reader.GetString(12);
393 order.BranchName = reader.GetString(13);
394 order.TaxRate = reader.GetFloat(14);
395 order.TaxAmount = reader.GetDecimal(15);
396 order.DiscountRate = reader.GetFloat(16);
397 order.DiscountAmount = reader.GetDecimal(17);
398 order.Subtotal = reader.GetDecimal(18);
399 order.TotalAmountDue = reader.GetDecimal(19);
400 order.AmountPaid = reader.GetDecimal(20);
401 order.AmountRefunded = reader.GetDecimal(21);
402 order.PaymentTypes = reader.GetString(22);
403 order.Location = reader.GetString(23);
404 order.Currency = reader.GetString(24);
405 order.IsFulfilled = reader.GetBoolean(25);
406 order.ConsumerFeedback = reader.GetString(26);
407 order.ConsumerRating = reader.GetInt32(27);
408 order.BusinessEmail = reader.GetString(28);
409 order.BusinessMobileNumber = reader.GetString(29);
410 order.CustomerEmail = reader.GetString(30);
411 order.FcmCustomer = reader.GetString(31);
412 order.FcmDevice = reader.GetString(32);
413 order.AmountDueProducer = reader.GetDecimal(33);
414 order.DeliveryFee = reader.GetDecimal(34);
415 order.HasDelivery = reader.GetBoolean(35);
416 order.BranchEmail = reader.GetString(36);
417 order.BranchPhoneNumber = reader.GetString(37);
418 order.BusinessName = reader.GetString(38);
419 order.CustomerReward = reader.GetDouble(39);
420 order.SenderId = reader.GetString(40);
421
422 if (orders.TryAdd(orderId, order))
423 {
424 var orderItem = new OrderItem();
425 orderItem.ItemId = reader.GetString(41);
426 orderItem.Name = reader.GetString(42);
427 orderItem.Quantity = reader.GetInt32(43);
428 orderItem.ServiceRequestId = reader.GetString(44);
429
430 if (orders[orderId].OrderItems == null)
431 orders[orderId].OrderItems = new List<OrderItem>();
432 orders[orderId].OrderItems.Add(orderItem);
433 }
434 }
435 }
436 }
437 }
438 }
439 catch (Exception ex)
440 {
441 _logger.LogError($"Couldn't fetch orders to push to kafka {ex.Message}:", ex);
442 conn.Close();
443 return;
444 }
445
446 conn.Close();
447 }
448
449 PushOrdersToKafka(orders, payments);
450 }
451
452 private void PushOrdersToKafka(Dictionary<string, Models.Order> orders, List<Payment> payments)
453 {
454 _logger.LogInformation("pushing to kafka");
455 var paidOrders = new List<Order>();
456
457 var cnt = payments.Count;
458 for (int i = 0; i < cnt; i++)
459 {
460 if (payments[i].IsSuccessful)
461 {
462 if (orders.TryGetValue(payments[i].OrderId, out Order order))
463 {
464 order.Payment = payments[i];
465 paidOrders.Add(order);
466 }
467 }
468 }
469
470
471 if (paidOrders.Count == 0)
472 return;
473
474 _logger.LogInformation($"Pushing paid orders to kafka");
475
476 _producer.Produce(paidOrders);
477 }
478 }
479}