Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR]: Transaction Retry Refactor #1494

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions jac-cloud/jac_cloud/core/architype.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,33 +234,33 @@ def has_operations(self) -> bool:
def commit(session: ClientSession) -> None:
"""Commit current session."""
commit_retry = 0
commit_max_retry = BulkWrite.SESSION_MAX_COMMIT_RETRY
while commit_retry <= commit_max_retry:
while True:
try:
session.commit_transaction()
break
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("UnknownTransactionCommitResult"):
if (
ex.has_error_label("UnknownTransactionCommitResult")
and commit_retry <= BulkWrite.SESSION_MAX_COMMIT_RETRY
):
commit_retry += 1
logger.error(
"Error commiting bulk write! "
f"Retrying [{commit_retry}/{commit_max_retry}] ..."
logger.exception(
"Error commiting session! "
f"Retrying [{commit_retry}/{BulkWrite.SESSION_MAX_COMMIT_RETRY}] ..."
)
continue
logger.error(
f"Error commiting bulk write after max retry [{commit_max_retry}] !"
logger.exception(
f"Error commiting session after max retry [{BulkWrite.SESSION_MAX_COMMIT_RETRY}] !"
)
raise
except Exception:
session.abort_transaction()
logger.error("Error commiting bulk write!")
logger.exception("Error commiting session!")
raise

def execute(self, session: ClientSession) -> None:
"""Execute all operations."""
transaction_retry = 0
transaction_max_retry = self.SESSION_MAX_TRANSACTION_RETRY
while transaction_retry <= transaction_max_retry:
while True:
try:
if node_operation := self.operations[NodeAnchor]:
NodeAnchor.Collection.bulk_write(node_operation, False, session)
Expand All @@ -273,19 +273,22 @@ def execute(self, session: ClientSession) -> None:
self.commit(session)
break
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
if (
ex.has_error_label("TransientTransactionError")
and transaction_retry <= self.SESSION_MAX_TRANSACTION_RETRY
):
transaction_retry += 1
logger.error(
logger.exception(
"Error executing bulk write! "
f"Retrying [{transaction_retry}/{transaction_max_retry}] ..."
f"Retrying [{transaction_retry}/{self.SESSION_MAX_TRANSACTION_RETRY}] ..."
)
continue
logger.error(
f"Error executing bulk write after max retry [{transaction_max_retry}] !"
logger.exception(
f"Error executing bulk write after max retry [{self.SESSION_MAX_TRANSACTION_RETRY}] !"
)
raise
except Exception:
logger.error("Error executing bulk write!")
logger.exception("Error executing bulk write!")
raise


Expand Down
26 changes: 15 additions & 11 deletions jac-cloud/jac_cloud/jaseci/routers/sso.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from fastapi_sso.sso.twitter import TwitterSSO
from fastapi_sso.sso.yandex import YandexSSO

from pymongo.errors import ConnectionFailure, OperationFailure
from pymongo.errors import ConnectionFailure, DuplicateKeyError, OperationFailure

from ..dtos import AttachSSO, DetachSSO
from ..models import NO_PASSWORD, User as BaseUser
Expand Down Expand Up @@ -214,8 +214,7 @@ def register(platform: str, open_id: OpenID) -> Response:

with User.Collection.get_session() as session, session.start_transaction():
retry = 0
max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY
while retry <= max_retry:
while True:
try:
if not User.Collection.update_one(
{"email": open_id.email},
Expand Down Expand Up @@ -243,21 +242,26 @@ def register(platform: str, open_id: OpenID) -> Response:
User.Collection.insert_one(ureq, session=session)
BulkWrite.commit(session)
return login(platform, open_id)
except DuplicateKeyError:
raise HTTPException(409, "Already Exists!")
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
if (
ex.has_error_label("TransientTransactionError")
and retry <= BulkWrite.SESSION_MAX_TRANSACTION_RETRY
):
retry += 1
logger.error(
logger.exception(
"Error executing bulk write! "
f"Retrying [{retry}/{max_retry}] ..."
f"Retrying [{retry}/{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] ..."
)
continue
logger.exception("Error executing bulk write!")
session.abort_transaction()
break
logger.exception(
f"Error executing bulk write after max retry [{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] !"
)
raise
except Exception:
logger.exception("Error executing bulk write!")
session.abort_transaction()
break
raise
return ORJSONResponse({"message": "Registration Failed!"}, 409)


Expand Down
25 changes: 15 additions & 10 deletions jac-cloud/jac_cloud/jaseci/routers/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from passlib.hash import pbkdf2_sha512

from pymongo.errors import ConnectionFailure, OperationFailure
from pymongo.errors import ConnectionFailure, DuplicateKeyError, OperationFailure

from ..dtos import (
UserChangePassword,
Expand Down Expand Up @@ -49,8 +49,7 @@ def register(req: User.register_type()) -> ORJSONResponse: # type: ignore
is_activated = req_obf["is_activated"] = not Emailer.has_client()

retry = 0
max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY
while retry <= max_retry:
while True:
try:
NodeAnchor.Collection.insert_one(root.serialize(), session)
if id := (
Expand All @@ -62,21 +61,27 @@ def register(req: User.register_type()) -> ORJSONResponse: # type: ignore
resp = {"message": "Successfully Registered!"}
log_exit(resp, log)
return ORJSONResponse(resp, 201)
raise SystemError("Can't create System Admin!")
except DuplicateKeyError:
raise HTTPException(409, "Already Exists!")
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
if (
ex.has_error_label("TransientTransactionError")
and retry <= BulkWrite.SESSION_MAX_TRANSACTION_RETRY
):
retry += 1
logger.error(
"Error executing bulk write! "
f"Retrying [{retry}/{max_retry}] ..."
f"Retrying [{retry}/{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] ..."
)
continue
logger.exception("Error executing bulk write!")
session.abort_transaction()
break
logger.exception(
f"Error executing bulk write after max retry [{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] !"
)
raise
except Exception:
logger.exception("Error executing bulk write!")
session.abort_transaction()
break
raise

resp = {"message": "Registration Failed!"}
log_exit(resp, log)
Expand Down
18 changes: 10 additions & 8 deletions jac-cloud/jac_cloud/plugin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ def create_system_admin(
)

retry = 0
max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY
while retry <= max_retry:
while True:
try:
if not NodeAnchor.Collection.find_by_id(
SUPER_ROOT_ID, session=session
Expand All @@ -160,21 +159,24 @@ def create_system_admin(
).inserted_id:
BulkWrite.commit(session)
return f"System Admin created with id: {id}"
session.abort_transaction()
raise SystemError("Can't create System Admin!")
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
if (
ex.has_error_label("TransientTransactionError")
and retry <= BulkWrite.SESSION_MAX_TRANSACTION_RETRY
):
retry += 1
logger.error(
"Error executing bulk write! "
f"Retrying [{retry}/{max_retry}] ..."
f"Retrying [{retry}/{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] ..."
)
continue
logger.exception("Error executing bulk write!")
session.abort_transaction()
logger.exception(
f"Error executing bulk write after max retry [{BulkWrite.SESSION_MAX_TRANSACTION_RETRY}] !"
)
raise
except Exception:
logger.exception("Error executing bulk write!")
session.abort_transaction()
raise

raise Exception("Can't process registration. Please try again!")
Loading