Improve tasks reliability (#390)

* Add broadcasted flag

* Improve celery tasks reliability
This commit is contained in:
Reckless_Satoshi 2023-03-16 00:53:37 +00:00 committed by GitHub
parent 64f3243c53
commit 94bc44ad0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 139 additions and 91 deletions

View File

@ -290,6 +290,7 @@ class OnchainPaymentAdmin(AdminChangeLinksMixin, admin.ModelAdmin):
"address",
"concept",
"status",
"broadcasted",
"num_satoshis",
"hash",
"swap_fee_rate",

View File

@ -158,7 +158,9 @@ class LNNode:
request, metadata=[("macaroon", MACAROON.hex())]
)
onchainpayment.txid = response.txid
if response.txid:
onchainpayment.txid = response.txid
onchainpayment.broadcasted = True
onchainpayment.save()
return True

View File

@ -152,45 +152,66 @@ class Command(BaseCommand):
queryset_retries = LNPayment.objects.filter(
type=LNPayment.Types.NORM,
status__in=[LNPayment.Status.VALIDI, LNPayment.Status.FAILRO],
status=LNPayment.Status.FAILRO,
in_flight=False,
routing_attempts__in=[1, 2],
last_routing_time__lt=(
timezone.now() - timedelta(minutes=int(config("RETRY_TIME")))
),
)
queryset = queryset.union(queryset_retries)
# Payments that still have the in_flight flag whose last payment attempt was +3 min ago
# are probably stuck. We retry them. The follow_send_invoice() task can also do TrackPaymentV2 if the
# previous attempt is still ongoing
queryset_stuck = LNPayment.objects.filter(
type=LNPayment.Types.NORM,
status__in=[LNPayment.Status.FAILRO, LNPayment.Status.FLIGHT],
in_flight=True,
last_routing_time__lt=(timezone.now() - timedelta(minutes=3)),
)
if len(queryset) > 0:
for lnpayment in queryset:
queryset = queryset.union(queryset_retries).union(queryset_stuck)
for lnpayment in queryset:
# Checks that this onchain payment is part of an order with a settled escrow
if not hasattr(lnpayment, "order_paid_LN"):
self.stdout.write(f"Ln payment {str(lnpayment)} has no parent order!")
return
order = lnpayment.order_paid_LN
if order.trade_escrow.status == LNPayment.Status.SETLED:
follow_send_payment.delay(lnpayment.payment_hash)
def send_onchain_payments(self):
queryset = OnchainPayment.objects.filter(
status=OnchainPayment.Status.QUEUE,
broadcasted=False,
)
if len(queryset) > 0:
for onchainpayment in queryset:
# Checks that this onchain payment is part of an order with a settled escrow
if not hasattr(onchainpayment, "order_paid_TX"):
self.stdout.write(
f"Onchain payment {str(onchainpayment)} has no parent order!"
)
return
order = onchainpayment.order_paid_TX
if order.trade_escrow.status == LNPayment.Status.SETLED:
# Sends out onchainpayment
LNNode.pay_onchain(
onchainpayment,
OnchainPayment.Status.QUEUE,
OnchainPayment.Status.MEMPO,
)
else:
self.stdout.write(
f"Onchain payment {str(onchainpayment)} for order {str(order)} escrow is not settled!"
)
for onchainpayment in queryset:
# Checks that this onchain payment is part of an order with a settled escrow
if not hasattr(onchainpayment, "order_paid_TX"):
self.stdout.write(
f"Onchain payment {str(onchainpayment)} has no parent order!"
)
return
order = onchainpayment.order_paid_TX
if (
order.trade_escrow.status == LNPayment.Status.SETLED
and order.trade_escrow.num_satoshis >= onchainpayment.num_satoshis
):
# Sends out onchainpayment
LNNode.pay_onchain(
onchainpayment,
OnchainPayment.Status.QUEUE,
OnchainPayment.Status.MEMPO,
)
onchainpayment.save()
else:
self.stdout.write(
f"Onchain payment {str(onchainpayment)} for order {str(order)} escrow is not settled!"
)
def update_order_status(self, lnpayment):
"""Background process following LND hold invoices

View File

@ -203,6 +203,8 @@ class OnchainPayment(models.Model):
choices=Status.choices, null=False, default=Status.CREAT
)
broadcasted = models.BooleanField(default=False, null=False, blank=False)
# payment info
address = models.CharField(
max_length=100, unique=False, default=None, null=True, blank=True

View File

@ -86,6 +86,8 @@ def follow_send_payment(hash):
from api.models import LNPayment, Order
lnpayment = LNPayment.objects.get(payment_hash=hash)
lnpayment.last_routing_time = timezone.now()
lnpayment.save()
# Default is 0ppm. Set by the user over API. Client's default is 1000 ppm.
fee_limit_sat = int(
@ -101,74 +103,75 @@ def follow_send_payment(hash):
)
order = lnpayment.order_paid_LN
if order.trade_escrow.num_satoshis < lnpayment.num_satoshis:
print(f"Order: {order.id} Payout is larger than collateral !?")
return
def handle_response(response):
lnpayment.status = LNPayment.Status.FLIGHT
lnpayment.in_flight = True
lnpayment.save()
order.status = Order.Status.PAY
order.save()
if response.status == 0: # Status 0 'UNKNOWN'
# Not sure when this status happens
print(f"Order: {order.id} UNKNOWN. Hash {hash}")
lnpayment.in_flight = False
lnpayment.save()
if response.status == 1: # Status 1 'IN_FLIGHT'
print(f"Order: {order.id} IN_FLIGHT. Hash {hash}")
if response.status == 3: # Status 3 'FAILED'
lnpayment.status = LNPayment.Status.FAILRO
lnpayment.last_routing_time = timezone.now()
lnpayment.routing_attempts += 1
lnpayment.failure_reason = response.failure_reason
lnpayment.in_flight = False
if lnpayment.routing_attempts > 2:
lnpayment.status = LNPayment.Status.EXPIRE
lnpayment.routing_attempts = 0
lnpayment.save()
order.status = Order.Status.FAI
order.expires_at = timezone.now() + timedelta(
seconds=order.t_to_expire(Order.Status.FAI)
)
order.save()
print(
f"Order: {order.id} FAILED. Hash: {hash} Reason: {LNNode.payment_failure_context[response.failure_reason]}"
)
return {
"succeded": False,
"context": f"payment failure reason: {LNNode.payment_failure_context[response.failure_reason]}",
}
if response.status == 2: # Status 2 'SUCCEEDED'
print(f"SUCCEEDED. Order: {order.id}. Hash: {hash}")
lnpayment.status = LNPayment.Status.SUCCED
lnpayment.fee = float(response.fee_msat) / 1000
lnpayment.preimage = response.payment_preimage
lnpayment.save()
order.status = Order.Status.SUC
order.expires_at = timezone.now() + timedelta(
seconds=order.t_to_expire(Order.Status.SUC)
)
order.save()
results = {"succeded": True}
return results
try:
for response in LNNode.routerstub.SendPaymentV2(
request, metadata=[("macaroon", MACAROON.hex())]
):
lnpayment.in_flight = True
lnpayment.save()
if response.status == 0: # Status 0 'UNKNOWN'
# Not sure when this status happens
lnpayment.in_flight = False
lnpayment.save()
if response.status == 1: # Status 1 'IN_FLIGHT'
print("IN_FLIGHT")
lnpayment.status = LNPayment.Status.FLIGHT
lnpayment.in_flight = True
lnpayment.save()
order.status = Order.Status.PAY
order.save()
if response.status == 3: # Status 3 'FAILED'
print("FAILED")
lnpayment.status = LNPayment.Status.FAILRO
lnpayment.last_routing_time = timezone.now()
lnpayment.routing_attempts += 1
lnpayment.failure_reason = response.failure_reason
lnpayment.in_flight = False
if lnpayment.routing_attempts > 2:
lnpayment.status = LNPayment.Status.EXPIRE
lnpayment.routing_attempts = 0
lnpayment.save()
order.status = Order.Status.FAI
order.expires_at = timezone.now() + timedelta(
seconds=order.t_to_expire(Order.Status.FAI)
)
order.save()
context = {
"routing_failed": LNNode.payment_failure_context[
response.failure_reason
],
"IN_FLIGHT": False,
}
# If failed due to not route, reset mission control. (This won't scale well, just a temporary fix)
# ResetMC deactivate temporary for tests
# if response.failure_reason==2:
# LNNode.resetmc()
return False, context
if response.status == 2: # Status 2 'SUCCEEDED'
print("SUCCEEDED")
lnpayment.status = LNPayment.Status.SUCCED
lnpayment.fee = float(response.fee_msat) / 1000
lnpayment.preimage = response.payment_preimage
lnpayment.save()
order.status = Order.Status.SUC
order.expires_at = timezone.now() + timedelta(
seconds=order.t_to_expire(Order.Status.SUC)
)
order.save()
return True, None
handle_response(response)
except Exception as e:
if "invoice expired" in str(e):
print("INVOICE EXPIRED")
print(f"Order: {order.id}. INVOICE EXPIRED. Hash: {hash}")
lnpayment.status = LNPayment.Status.EXPIRE
lnpayment.last_routing_time = timezone.now()
lnpayment.in_flight = False
@ -178,8 +181,20 @@ def follow_send_payment(hash):
seconds=order.t_to_expire(Order.Status.FAI)
)
order.save()
context = {"routing_failed": "The payout invoice has expired"}
return False, context
results = {"succeded": False, "context": "The payout invoice has expired"}
return results
if "payment is in transition" in str(e):
print(f"Order: {order.id}. ALREADY IN TRANSITION. Hash: {hash}.")
request = LNNode.routerrpc.TrackPaymentRequest(
payment_hash=bytes.fromhex(hash)
)
for response in LNNode.routerstub.TrackPaymentV2(
request, metadata=[("macaroon", MACAROON.hex())]
):
handle_response(response)
@shared_task(name="payments_cleansing")

View File

@ -100,7 +100,7 @@ services:
volumes:
- .:/usr/src/robosats
- ./node/lnd:/lnd
command: celery -A robosats worker --loglevel=WARNING
command: celery -A robosats worker --loglevel=INFO --concurrency 4 --max-tasks-per-child=4 --max-memory-per-child=200000
depends_on:
- redis
network_mode: service:tor

View File

@ -1,4 +1,4 @@
import React, { useEffect, useState } from 'react';
import React, { useEffect, useLayoutEffect, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { Button, Tooltip, TextField, Grid, Paper } from '@mui/material';
import { encryptMessage, decryptMessage } from '../../../../pgp';
@ -73,6 +73,13 @@ const EncryptedSocketChat: React.FC<Props> = ({
}
}, [status]);
useLayoutEffect(() => {
// On component unmount close reconnecting-websockets
return () => {
connection?.close();
};
}, []);
useEffect(() => {
if (messages.length > messageCount) {
audio.play();

View File

@ -9,7 +9,7 @@ django-timezone-field==4.2.3
djangorestframework==3.13.1
channels==3.0.4
channels-redis==3.3.1
celery==5.2.3
celery==5.2.7
grpcio==1.43.0
googleapis-common-protos==1.53.0
grpcio-tools==1.43.0
@ -21,7 +21,7 @@ ring==0.9.1
git+https://github.com/Reckless-Satoshi/Robohash.git
scipy==1.8.0
gunicorn==20.1.0
psycopg2==2.9.3
psycopg2==2.9.5
SQLAlchemy==1.4.31
django-import-export==2.7.1
requests[socks]