Skip to content

Commit

Permalink
backtrace route path for failed payment
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaozhou committed May 19, 2020
1 parent 345bd2c commit f0c1e78
Show file tree
Hide file tree
Showing 22 changed files with 843 additions and 378 deletions.
2 changes: 1 addition & 1 deletion cnode/pay.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (c *CNode) sendSettleRequestForExpiredPays(expiredPays []*entity.Conditiona
func (c *CNode) sendSettleProofForExpiredPays(expiredPayIDs []ctype.PayIDType) error {
logEntry := pem.NewPem(c.nodeConfig.GetRPCAddr())
logEntry.Type = pem.PayMessageType_DST_SETTLE_EXPIRED_PAY_API
err := c.messager.SendPaysSettleProof(expiredPayIDs, rpc.PaymentSettleReason_PAY_EXPIRED, logEntry)
err := c.messager.SendPaysSettleProof(expiredPayIDs, rpc.PaymentSettleReason_PAY_EXPIRED, nil, logEntry)
if err != nil {
logEntry.Error = append(logEntry.Error, err.Error())
}
Expand Down
25 changes: 25 additions & 0 deletions handlers/msghdl/celer_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@
package msghdl

import (
"bytes"
"fmt"
"sync"

"github.com/celer-network/goCeler/common"
"github.com/celer-network/goCeler/common/event"
"github.com/celer-network/goCeler/common/intfs"
"github.com/celer-network/goCeler/ctype"
"github.com/celer-network/goCeler/dispute"
"github.com/celer-network/goCeler/entity"
"github.com/celer-network/goCeler/handlers"
"github.com/celer-network/goCeler/messager"
"github.com/celer-network/goCeler/pem"
"github.com/celer-network/goCeler/route"
"github.com/celer-network/goCeler/rpc"
"github.com/celer-network/goCeler/storage"
"github.com/celer-network/goutils/log"
"github.com/golang/protobuf/proto"
)

type CooperativeWithdraw interface {
Expand Down Expand Up @@ -159,3 +163,24 @@ func (h *CelerMsgHandler) GetMsgName() string {
func validRecvdSeqNum(stored, recvd, base uint64) bool {
return stored == base && recvd > stored
}

func (h *CelerMsgHandler) payFromSelf(pay *entity.ConditionalPay) bool {
return bytes.Compare(pay.GetSrc(), h.nodeConfig.GetOnChainAddr().Bytes()) == 0
}

func (h *CelerMsgHandler) prependPayPath(payPath *rpc.PayPath, payHop *rpc.PayHop) error {
payHopBytes, err := proto.Marshal(payHop)
if err != nil {
return fmt.Errorf("marshal payHop err: %w", err)
}
sig, err := h.signer.SignEthMessage(payHopBytes)
if err != nil {
return fmt.Errorf("sign payHop err: %w", err)
}
signedPayHop := &rpc.SignedPayHop{
PayHopBytes: payHopBytes,
Sig: sig,
}
payPath.Hops = append([]*rpc.SignedPayHop{signedPayHop}, payPath.Hops...)
return nil
}
41 changes: 33 additions & 8 deletions handlers/msghdl/handle_cond_pay_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,9 @@ func (h *CelerMsgHandler) verifyCommonPayRequest(
return nil
}

func (h *CelerMsgHandler) condPayRequestOutbound(msg *common.MsgFrame) error {
request := msg.Message.GetCondPayRequest()
func (h *CelerMsgHandler) condPayRequestOutbound(frame *common.MsgFrame) error {
peerFrom := frame.PeerAddr
request := frame.Message.GetCondPayRequest()
if request.GetDirectPay() {
log.Debugln("Skip pay receipt for direct pay")
return nil
Expand All @@ -419,7 +420,7 @@ func (h *CelerMsgHandler) condPayRequestOutbound(msg *common.MsgFrame) error {
payID := ctype.Pay2PayID(&pay)

dest := ctype.Bytes2Addr(pay.GetDest())
logEntry := msg.LogEntry
logEntry := frame.LogEntry
if logEntry.GetPayId() == "" {
logEntry.PayId = ctype.PayID2Hex(payID)
} else if logEntry.GetPayId() != ctype.PayID2Hex(payID) {
Expand All @@ -442,7 +443,7 @@ func (h *CelerMsgHandler) condPayRequestOutbound(msg *common.MsgFrame) error {
CondPayReceipt: receipt,
},
}
err2 = h.streamWriter.WriteCelerMsg(msg.PeerAddr, celerMsg)
err2 = h.streamWriter.WriteCelerMsg(peerFrom, celerMsg)
if err2 != nil {
return fmt.Errorf(err2.Error() + ", FAIL_SEND_RECEIPT")
}
Expand All @@ -452,14 +453,38 @@ func (h *CelerMsgHandler) condPayRequestOutbound(msg *common.MsgFrame) error {
// Forward condPay to next hop if I am not the destination
log.Debugln("Forward", payID.Hex())
delegable, proof, description := h.checkPayDelegable(&pay, ctype.Bytes2Addr(pay.GetDest()), logEntry)
err = h.messager.ForwardCondPayRequest(payBytes, request.GetNote(), delegable, logEntry)
peerTo, err := h.messager.ForwardCondPayRequest(payBytes, request.GetNote(), delegable, logEntry)
if err != nil {
if delegable && err == common.ErrPeerNotOnline {
return h.delegatePay(payID, &pay, payBytes, description, proof, msg.PeerAddr, dest, logEntry)
if delegable && errors.Is(err, common.ErrPeerNotOnline) {
return h.delegatePay(payID, &pay, payBytes, description, proof, peerFrom, dest, logEntry)
}
logEntry.Error = append(logEntry.Error, err.Error()+", DST_UNREACHABLE")
errmsg := &rpc.Error{
Reason: err.Error(),
}
if errors.Is(err, common.ErrPeerNotOnline) {
errmsg.Code = rpc.ErrCode_PEER_NOT_ONLINE
} else if errors.Is(err, common.ErrNoEnoughBalance) {
errmsg.Code = rpc.ErrCode_NOT_ENOUGH_BALANCE
} else if errors.Is(err, common.ErrRouteNotFound) {
errmsg.Code = rpc.ErrCode_NO_ROUTE_TO_DST
} else {
errmsg.Code = rpc.ErrCode_MISC_ERROR
}
payHop := &rpc.PayHop{
PayId: payID.Bytes(),
PrevHopAddr: peerFrom.Bytes(),
NextHopAddr: peerTo.Bytes(),
Err: errmsg,
}
payPath := &rpc.PayPath{}
err = h.prependPayPath(payPath, payHop)
if err != nil {
return err
}

// Cancel the payment upfront
return h.messager.SendOnePaySettleProof(payID, rpc.PaymentSettleReason_PAY_DEST_UNREACHABLE, logEntry)
return h.messager.SendPayUnreachableSettleProof(payID, payPath, logEntry)
}

return nil
Expand Down
57 changes: 45 additions & 12 deletions handlers/msghdl/handle_hop_ack_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package msghdl

import (
"bytes"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -201,9 +200,33 @@ func (h *CelerMsgHandler) HandleHopAckState(frame *common.MsgFrame) error {
log.Error(err)
return err
}
payAmt := new(big.Int).SetUint64(0)
if !h.payFromSelf(&pay) {
payID := ctype.PayBytes2PayID(req.GetCondPay())
ingressPeer, found, err := h.dal.GetPayIngressPeer(payID)
if err != nil {
return fmt.Errorf("GetPayIngressPeer err: %w", err)
}
if !found {
return fmt.Errorf("GetPayIngressPeer err: %w", common.ErrPayNoIngress)
}
payHop := &rpc.PayHop{
PayId: payID.Bytes(),
PrevHopAddr: ingressPeer.Bytes(),
NextHopAddr: frame.PeerAddr.Bytes(),
Err: &rpc.Error{Code: rpc.ErrCode_PAY_ROUTE_LOOP},
}
payPath := &rpc.PayPath{}
err = h.prependPayPath(payPath, payHop)
if err != nil {
return err
}
err = h.dal.PutPayPath(payID, payPath)
if err != nil {
return fmt.Errorf("PutPayPath err: %w", err)
}
}
return h.messager.SendOnePaySettleRequest(
&pay, payAmt, rpc.PaymentSettleReason_PAY_DEST_UNREACHABLE, logEntry)
&pay, new(big.Int).SetUint64(0), rpc.PaymentSettleReason_PAY_DEST_UNREACHABLE, logEntry)
}

return nil
Expand Down Expand Up @@ -255,7 +278,7 @@ func (h *CelerMsgHandler) processAckMsgs(ackedMsgs []*rpc.CelerMsg, logEntry *pe
h.notifyPayComplete(settledPay, pay)
} else {
// I'm not the sender, forward request to upstream
go h.forwardToUpstream(settledPay, pay, logEntry)
h.forwardToUpstream(settledPay, pay, logEntry)
}
}
}
Expand Down Expand Up @@ -336,13 +359,27 @@ func (h *CelerMsgHandler) forwardToUpstream(

payID := ctype.Bytes2PayID(settledPay.GetSettledPayId())
if settledPay.GetReason() == rpc.PaymentSettleReason_PAY_REJECTED ||
settledPay.GetReason() == rpc.PaymentSettleReason_PAY_DEST_UNREACHABLE ||
settledPay.GetReason() == rpc.PaymentSettleReason_PAY_RESOLVED_ONCHAIN {

log.Debugln("forward rejected pay to upstream", payID.Hex())
log.Debugln("forward pay to upstream", payID.Hex(), settledPay.GetReason())
err := h.messager.SendOnePaySettleProof(payID, settledPay.GetReason(), logEntry)
if err != nil {
log.Error(err)
logEntry.Error = append(logEntry.Error, "SendOnePaySettleProof err: "+err.Error())
return
}
} else if settledPay.GetReason() == rpc.PaymentSettleReason_PAY_DEST_UNREACHABLE {
payPath, err := h.dal.GetPayPath(payID)
if err != nil {
logEntry.Error = append(logEntry.Error, "GetPayPath err: "+err.Error())
payPath = nil
} else {
err = h.dal.DeletePayPath(payID)
if err != nil {
logEntry.Error = append(logEntry.Error, "DeletePayPath err: "+err.Error())
}
}
err = h.messager.SendPayUnreachableSettleProof(payID, payPath, logEntry)
if err != nil {
logEntry.Error = append(logEntry.Error, "SendPayUnreachableSettleProof err: "+err.Error())
return
}
}
Expand Down Expand Up @@ -528,7 +565,3 @@ func (h *CelerMsgHandler) getChanMessage(tx *storage.DALTx, cid ctype.CidType, s
}
return msg, nil
}

func (h *CelerMsgHandler) payFromSelf(pay *entity.ConditionalPay) bool {
return bytes.Compare(pay.GetSrc(), h.nodeConfig.GetOnChainAddr().Bytes()) == 0
}
58 changes: 46 additions & 12 deletions handlers/msghdl/handle_pay_settle_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
package msghdl

import (
"errors"
"fmt"
"math/big"

"github.com/celer-network/goCeler/common"
"github.com/celer-network/goCeler/ctype"
"github.com/celer-network/goCeler/entity"
"github.com/celer-network/goCeler/rpc"
"github.com/celer-network/goCeler/utils"
"github.com/celer-network/goutils/log"
)

Expand All @@ -26,10 +26,10 @@ func (h *CelerMsgHandler) HandlePaySettleProof(frame *common.MsgFrame) error {
var err error
if len(payProof.VouchedCondPayResults) > 0 {
log.Debugln("received settle proof with vouched results from", ctype.Addr2Hex(peer))
return errors.New("Vouched pay result not accepted yet")
return fmt.Errorf("Vouched pay result not accepted yet")
}
if len(payProof.SettledPays) == 0 {
return errors.New("Empty payment settle proof")
return fmt.Errorf("Empty payment settle proof")
}

log.Debugln("received settle proof with settled pays from", ctype.Addr2Hex(peer))
Expand All @@ -43,13 +43,14 @@ func (h *CelerMsgHandler) HandlePaySettleProof(frame *common.MsgFrame) error {
var payAmts []*big.Int
for _, sp := range payProof.SettledPays {
if sp.Reason != rpc.PaymentSettleReason_PAY_EXPIRED {
return errors.New("batched pay settle proof with different reasons not supported")
return fmt.Errorf("batched pay settle proof with different reasons not supported")
}
payID := ctype.Bytes2PayID(sp.SettledPayId)
logEntry.PayId = ctype.PayID2Hex(payID)
logEntry.PayIds = append(logEntry.PayIds, ctype.PayID2Hex(payID))
var found bool
pay, _, found, err = h.dal.GetPayment(payID)
var egressPeer ctype.Addr
pay, egressPeer, found, err = h.dal.GetPayForRecvSettleProof(payID)
if err != nil {
log.Errorln(err, payID.Hex())
continue
Expand All @@ -58,34 +59,42 @@ func (h *CelerMsgHandler) HandlePaySettleProof(frame *common.MsgFrame) error {
log.Errorln(common.ErrPayNotFound, payID.Hex())
continue
}
if egressPeer != peer {
return fmt.Errorf("settle proof sender and egress peer not match")
}
expiredPays = append(expiredPays, pay)
payAmts = append(payAmts, new(big.Int).SetUint64(0))
}
if len(expiredPays) == 0 {
return fmt.Errorf("no valid expired pays to settle")
}
_, err = h.messager.SendPaysSettleRequest(expiredPays, payAmts, reason, logEntry)
if err != nil {
err = fmt.Errorf("SendPaysSettleRequest err: %w", err)
}
// do not foward settle proof for expired pays.
// Currently, we only consider single-account (multi-server) OSP. Clients will actively call
// SettleExpiredPays() on demand (e.g., periodically or when start)
// TODO: handle multi-account OSP cases
// each pair of peers are responsible for settling expired pays among themselves
return err
}

if len(payProof.SettledPays) > 1 {
return errors.New("Batched pay settle proof for unexpired pays not supported")
return fmt.Errorf("Batched pay settle proof for unexpired pays not supported")
}

payID := ctype.Bytes2PayID(settledPay.GetSettledPayId())
logEntry.PayId = ctype.PayID2Hex(payID)
var found bool
pay, _, found, err = h.dal.GetPayment(payID)
var ingressPeer, egressPeer ctype.Addr
pay, egressPeer, found, err = h.dal.GetPayForRecvSettleProof(payID)
if err != nil {
return fmt.Errorf("GetPayment err %w", err)
}
if !found {
return common.ErrPayNotFound
}
if egressPeer != peer {
return fmt.Errorf("settle proof sender and egress peer not match")
}
payAmt := new(big.Int).SetUint64(0)
switch reason {
case rpc.PaymentSettleReason_PAY_REJECTED:
Expand All @@ -95,9 +104,34 @@ func (h *CelerMsgHandler) HandlePaySettleProof(frame *common.MsgFrame) error {
return fmt.Errorf("GetCondPayInfoFromRegistry err: %w", err)
}
case rpc.PaymentSettleReason_PAY_DEST_UNREACHABLE:
h.notifyUnreachablility(payID, pay)
payPath := settledPay.GetPath()
logEntry.PayPath = utils.PrintPayPath(payPath, payID)
if h.payFromSelf(pay) {
h.notifyUnreachablility(payID, pay)
} else {
ingressPeer, found, err = h.dal.GetPayIngressPeer(payID)
if err != nil {
return fmt.Errorf("GetPayIngressPeer err: %w", err)
}
if !found {
return fmt.Errorf("GetPayIngressPeer err: %w", common.ErrPayNoIngress)
}
payHop := &rpc.PayHop{
PayId: payID.Bytes(),
PrevHopAddr: ingressPeer.Bytes(),
NextHopAddr: egressPeer.Bytes(),
}
err = h.prependPayPath(payPath, payHop)
if err != nil {
return err
}
err = h.dal.PutPayPath(payID, payPath)
if err != nil {
return fmt.Errorf("PutPayPath err: %w", err)
}
}
default:
return errors.New("Unsupported payment settle type")
return fmt.Errorf("Unsupported payment settle type")
}

err = h.messager.SendOnePaySettleRequest(pay, payAmt, reason, logEntry)
Expand Down
17 changes: 12 additions & 5 deletions handlers/msghdl/handle_pay_settle_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,12 @@ func (h *CelerMsgHandler) processPaySettleRequestTx(tx *storage.DALTx, args ...i
resolvedAmt := new(big.Int).SetUint64(0)
for _, pi := range payInfos {
payID := ctype.Bytes2PayID(pi.req.GetSettledPayId())
pi.pay, pi.note, pi.igcid, pi.igstate, pi.egstate, found, err = tx.GetPayForRecvSettle(payID)
pi.pay, pi.note, pi.igcid, pi.igstate, pi.egstate, found, err = tx.GetPayForRecvSettleReq(payID)
if err != nil {
return fmt.Errorf("GetPayForRecvSettle %x err %w", payID, err)
return fmt.Errorf("GetPayForRecvSettleReq %x err %w", payID, err)
}
if !found {
return fmt.Errorf("GetPayForRecvSettle %x %w", payID, common.ErrPayNotFound) // db error
return fmt.Errorf("GetPayForRecvSettleReq %x %w", payID, common.ErrPayNotFound) // db error
}
amt := new(big.Int).SetBytes(pi.pay.GetTransferFunc().GetMaxTransfer().GetReceiver().GetAmt())
resolvedAmt = resolvedAmt.Add(resolvedAmt, amt)
Expand Down Expand Up @@ -364,13 +364,20 @@ func (h *CelerMsgHandler) processPaySettleRequestTx(tx *storage.DALTx, args ...i
}
}

case rpc.PaymentSettleReason_PAY_REJECTED, rpc.PaymentSettleReason_PAY_DEST_UNREACHABLE:
case rpc.PaymentSettleReason_PAY_REJECTED:
pi := payInfos[0]
payID := ctype.Bytes2PayID(pi.req.GetSettledPayId())
if pi.igstate != enums.PayState_INGRESS_REJECTED {
return fmt.Errorf("invalid status for rejected pay %x", payID)
}

case rpc.PaymentSettleReason_PAY_DEST_UNREACHABLE:
pi := payInfos[0]
payID := ctype.Bytes2PayID(pi.req.GetSettledPayId())
h.checkPayRouteLoop(cid, pi)
if !pi.routeLoop {
if pi.igstate != enums.PayState_INGRESS_REJECTED {
return fmt.Errorf("invalid status for rejected or unreachable pay %x", payID)
return fmt.Errorf("invalid status for unreachable pay %x", payID)
}
}
}
Expand Down
Loading

0 comments on commit f0c1e78

Please sign in to comment.