Skip to content

Commit

Permalink
Merge pull request #412 from DanGould/handle-unrecoverable-pj
Browse files Browse the repository at this point in the history
Mark unrecoverable errors so they don't spawn
  • Loading branch information
i5hi authored Jan 4, 2025
2 parents 6e1ccb6 + 2248dc4 commit c518ffb
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 55 deletions.
131 changes: 76 additions & 55 deletions lib/_pkg/payjoin/manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ const List<String> _ohttpRelayUrls = [

const payjoinDirectoryUrl = 'https://payjo.in';

sealed class SessionError {
final String message;

const SessionError._(this.message);

factory SessionError.recoverable(String message) = RecoverableError;
factory SessionError.unrecoverable(String message) = UnrecoverableError;
}

class RecoverableError extends SessionError {
const RecoverableError(super.message) : super._();
}

class UnrecoverableError extends SessionError {
const UnrecoverableError(super.message) : super._();
}

class PayjoinManager {
PayjoinManager(this._walletTx, this._payjoinStorage);
final WalletTx _walletTx;
Expand Down Expand Up @@ -123,12 +140,15 @@ class PayjoinManager {
await _payjoinStorage.markSenderSessionComplete(pjUri);
completer.complete(null);
}
} else if (message is Err) {
} else if (message is SessionError) {
PayjoinEventBus().emit(
PayjoinSendFailureEvent(pjUri: pjUri, error: message),
PayjoinSendFailureEvent(pjUri: pjUri, error: message.message),
);
if (message is UnrecoverableError) {
await _payjoinStorage.markSenderSessionUnrecoverable(pjUri);
}
await _cleanupSession(pjUri);
completer.complete(message);
completer.complete(Err(message.message));
}
});

Expand Down Expand Up @@ -254,15 +274,21 @@ class PayjoinManager {
// TODO PROPAGATE ERROR TO UI TOAST / TRANSACTION HISTORY
debugPrint(e.toString());
await _cleanupSession(receiver.id());
await _payjoinStorage
.markReceiverSessionUnrecoverable(receiver.id());
completer.complete(
Err(
e.toString(),
),
);
}
} else if (message is Err) {
} else if (message is SessionError) {
await _cleanupSession(receiver.id());
completer.complete(message);
if (message is UnrecoverableError) {
await _payjoinStorage
.markReceiverSessionUnrecoverable(receiver.id());
}
completer.complete(Err(message.toString()));
}
});

Expand Down Expand Up @@ -301,11 +327,13 @@ class PayjoinManager {
final filteredReceivers = receiverSessions
.where((session) =>
session.walletId == wallet.id &&
session.status != PayjoinSessionStatus.success)
session.status != PayjoinSessionStatus.success &&
session.status != PayjoinSessionStatus.unrecoverable)
.toList();
final filteredSenders = senderSessions.where((session) {
return session.walletId == wallet.id &&
session.status != PayjoinSessionStatus.success;
session.status != PayjoinSessionStatus.success &&
session.status != PayjoinSessionStatus.unrecoverable;
}).toList();

final spawnedReceivers = filteredReceivers.map((session) {
Expand Down Expand Up @@ -368,6 +396,7 @@ class PayjoinManager {

enum PayjoinSessionStatus {
pending,
unrecoverable,
success,
}

Expand Down Expand Up @@ -473,21 +502,19 @@ Future<void> _isolateSender(List<dynamic> args) async {
// Reconstruct the Sender from the JSON
final sender = Sender.fromJson(senderJson);

// Run the sender logic inside the isolate
try {
final proposalPsbt = await _runSender(sender, sendPort: sendPort);
if (proposalPsbt == null) throw Exception('proposalPsbt is null');
sendPort.send({
'type': 'psbt_to_sign',
'psbt': proposalPsbt,
});
} catch (e) {
sendPort.send(Err(e.toString()));
sendPort.send(e);
}
}

/// Top-level function that attempts to run payjoin sender (V2 protocol first, fallback to V1).
Future<String?> _runSender(Sender sender, {required SendPort sendPort}) async {
Future<String> _runSender(Sender sender, {required SendPort sendPort}) async {
final dio = Dio();

try {
Expand All @@ -506,24 +533,26 @@ Future<String?> _runSender(Sender sender, {required SendPort sendPort}) async {
sendPort.send({'type': 'request_posted'});

while (true) {
try {
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
final proposalPsbt = await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
if (proposalPsbt != null) return proposalPsbt;
} catch (e) {
print('Error occurred while processing payjoin: $e');
// Loop until a valid response is found
}
final (getRequest, getReqCtx) = await getCtx.extractReq(
ohttpRelay: await _randomOhttpRelayUrl(),
);
final getRes = await _postRequest(dio, getRequest);
final proposalPsbt = await getCtx.processResponse(
response: getRes.data as List<int>,
ohttpCtx: getReqCtx,
);
if (proposalPsbt != null) return proposalPsbt;
}
} catch (e) {
// If V2 fails, attempt V1
return await _runSenderV1(sender, dio, sendPort);
if (e is PayjoinException &&
// TODO condition on error type instead of message content
e.message?.contains('parse receiver public key') == true) {
return await _runSenderV1(sender, dio, sendPort);
} else if (e is DioException) {
throw Exception(SessionError.recoverable(e.toString()));
} else {
throw Exception(SessionError.unrecoverable(e.toString()));
}
}
}

Expand Down Expand Up @@ -651,7 +680,7 @@ Future<void> _isolateReceiver(List<dynamic> args) async {
return payjoinProposal;
} catch (e) {
print('Error occurred while finalizing proposal: $e');
throw Exception('Error occurred while finalizing proposal');
rethrow;
}
}

Expand All @@ -668,10 +697,10 @@ Future<void> _isolateReceiver(List<dynamic> args) async {
'type': 'proposal_sent',
});
} catch (e) {
try {
isolateTomainSendPort.send(Err(e.toString()));
} catch (e) {
print('$e');
if (e is DioException) {
isolateTomainSendPort.send(SessionError.recoverable(e.toString()));
} else {
isolateTomainSendPort.send(SessionError.unrecoverable(e.toString()));
}
}
}
Expand All @@ -680,34 +709,26 @@ Future<UncheckedProposal> _receiveUncheckedProposal(
Dio dio,
Receiver receiver,
) async {
try {
while (true) {
final (req, context) = await receiver.extractReq();
final ohttpResponse = await _postRequest(dio, req);
final proposal = await receiver.processRes(
body: ohttpResponse.data as List<int>,
ctx: context,
);
if (proposal != null) {
return proposal;
}
while (true) {
final (req, context) = await receiver.extractReq();
final ohttpResponse = await _postRequest(dio, req);
final proposal = await receiver.processRes(
body: ohttpResponse.data as List<int>,
ctx: context,
);
if (proposal != null) {
return proposal;
}
} catch (e) {
throw Exception('Error occurred while processing payjoin receiver: $e');
}
}

Future<void> _respondProposal(Dio dio, PayjoinProposal proposal) async {
try {
final (postReq, ohttpCtx) = await proposal.extractV2Req();
final postRes = await _postRequest(dio, postReq);
await proposal.processRes(
res: postRes.data as List<int>,
ohttpContext: ohttpCtx,
);
} catch (e) {
throw Exception('Error occurred while processing payjoin: $e');
}
final (postReq, ohttpCtx) = await proposal.extractV2Req();
final postRes = await _postRequest(dio, postReq);
await proposal.processRes(
res: postRes.data as List<int>,
ohttpContext: ohttpCtx,
);
}

/// Posts a request via dio and returns the response.
Expand Down
45 changes: 45 additions & 0 deletions lib/_pkg/payjoin/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,28 @@ class PayjoinStorage {
}
}

Future<Err?> markReceiverSessionUnrecoverable(String id) async {
try {
final (session, err) = await readReceiverSession(id);
if (err != null) return err;

final updatedSession = RecvSession(
session!.isTestnet,
session.receiver,
session.walletId,
PayjoinSessionStatus.unrecoverable,
);

await _hiveStorage.saveValue(
key: receiverPrefix + id,
value: jsonEncode(updatedSession.toJson()),
);
return null;
} catch (e) {
return Err(e.toString());
}
}

Future<(List<RecvSession>, Err?)> readAllReceivers() async {
//deleteAllSessions();
try {
Expand Down Expand Up @@ -186,6 +208,29 @@ class PayjoinStorage {
}
}

Future<Err?> markSenderSessionUnrecoverable(String pjUri) async {
try {
final (session, err) = await readSenderSession(pjUri);
if (err != null) return err;

final updatedSession = SendSession(
session!.isTestnet,
session.sender,
session.walletId,
session.pjUri,
PayjoinSessionStatus.unrecoverable,
);

await _hiveStorage.saveValue(
key: senderPrefix + pjUri,
value: jsonEncode(updatedSession.toJson()),
);
return null;
} catch (e) {
return Err(e.toString());
}
}

Future<(List<SendSession>, Err?)> readAllSenders() async {
try {
final (allData, err) = await _hiveStorage.getAll();
Expand Down

0 comments on commit c518ffb

Please sign in to comment.