Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

monitor pg connection when uploading chunks #634

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess
)
from pghoard.compressor import CompressionEvent
from pghoard.pgutil import check_if_pg_connection_is_alive
from pghoard.transfer import UploadEvent

BASEBACKUP_NAME = "pghoard_base_backup"
Expand Down Expand Up @@ -543,6 +544,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
self.log.debug("Connecting to database to start backup process")
connection_string = connection_string_using_pgpass(self.connection_info)
with psycopg2.connect(connection_string) as db_conn:
conn_polling = lambda: check_if_pg_connection_is_alive(db_conn)
cursor = db_conn.cursor()

if self.pg_version_server < 90600:
Expand Down Expand Up @@ -589,6 +591,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
for item in self.find_files_to_backup(pgdata=pgdata, tablespaces=tablespaces)
if not item[1].endswith(".pem") # Exclude such files like "dh1024.pem"
),
conn_polling=conn_polling,
)
chunks_count = len(chunk_files)
control_files_metadata_extra["chunks"] = chunk_files
Expand All @@ -607,11 +610,12 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
# Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0
# is reserved for special files and metadata and will be generated last.
chunk_files = self.chunk_uploader.create_and_upload_chunks(
chunks,
data_file_format,
compressed_base,
chunks=chunks,
data_file_format=data_file_format,
temp_base_dir=compressed_base,
delta_stats=delta_stats,
file_type=FileType.Basebackup_chunk
file_type=FileType.Basebackup_chunk,
conn_polling=conn_polling,
)

total_size_plain = sum(item["input_size"] for item in chunk_files)
Expand Down
27 changes: 22 additions & 5 deletions pghoard/basebackup/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,16 @@ class DeltaStats:

class ChunkUploader:
def __init__(
self, metrics: Metrics, chunks_on_disk: int, encryption_data: EncryptionData, compression_data: CompressionData,
site_config: Dict[str, Any], site: str, is_running: Callable[[], bool], transfer_queue: TransferQueue
self,
*,
metrics: Metrics,
chunks_on_disk: int,
encryption_data: EncryptionData,
compression_data: CompressionData,
site_config: Dict[str, Any],
site: str,
is_running: Callable[[], bool],
transfer_queue: TransferQueue,
):
self.log = logging.getLogger("ChunkUploader")
self.metrics = metrics
Expand Down Expand Up @@ -216,9 +224,15 @@ def handle_single_chunk(
chunks,
index: int,
temp_dir: Path,
conn_polling: Callable[[], bool],
delta_stats: Optional[DeltaStats] = None,
file_type: FileType = FileType.Basebackup_chunk
file_type: FileType = FileType.Basebackup_chunk,
) -> Dict[str, Any]:
# if the chunk is dependent on a PG connection and connection
# is not alive then abort the task
if not conn_polling():
raise RuntimeError("ERROR: PostgreSQL connection was lost during backup process.")

one_chunk_files = chunks[index]
chunk_name, input_size, result_size = self.tar_one_file(
callback_queue=chunk_callback_queue,
Expand Down Expand Up @@ -260,12 +274,14 @@ def wait_for_chunk_transfer_to_complete(

def create_and_upload_chunks(
self,
*,
chunks,
data_file_format: Callable[[int], str],
conn_polling: Callable[[], bool],
temp_base_dir: Path,
delta_stats: Optional[DeltaStats] = None,
file_type: FileType = FileType.Basebackup_chunk,
chunks_max_progress: float = 100.0
chunks_max_progress: float = 100.0,
) -> List[Dict[str, Any]]:
start_time = time.monotonic()
chunk_files = []
Expand Down Expand Up @@ -299,8 +315,9 @@ def create_and_upload_chunks(
chunks=chunks,
index=i,
temp_dir=temp_base_dir,
conn_polling=conn_polling,
delta_stats=delta_stats,
file_type=file_type
file_type=file_type,
)
pending_compress_and_encrypt_tasks.append(task)
self.chunks_on_disk += 1
Expand Down
38 changes: 31 additions & 7 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,21 @@ class HasReadAndSeek(HasRead, HasSeek, Protocol):

class DeltaBaseBackup:
def __init__(
self, *, storage: BaseTransfer, site: str, site_config: Dict[str, Any], transfer_queue: TransferQueue,
metrics: Metrics, encryption_data: EncryptionData, compression_data: CompressionData,
get_remote_basebackups_info: Callable[[str], List[Dict[str, Any]]], parallel: int, temp_base_dir: Path,
compressed_base: Path, chunk_uploader: ChunkUploader, data_file_format: Callable[[int], str]
self,
*,
storage: BaseTransfer,
site: str,
site_config: Dict[str, Any],
transfer_queue: TransferQueue,
metrics: Metrics,
encryption_data: EncryptionData,
compression_data: CompressionData,
get_remote_basebackups_info: Callable[[str], List[Dict[str, Any]]],
parallel: int,
temp_base_dir: Path,
compressed_base: Path,
chunk_uploader: ChunkUploader,
data_file_format: Callable[[int], str],
):
self.log = logging.getLogger("DeltaBaseBackup")
self.storage = storage
Expand Down Expand Up @@ -384,11 +395,17 @@ def _split_files_for_upload(

return delta_chunks, todo_hexdigests

def _upload_chunks(self, delta_chunks, chunks_max_progress: float) -> Tuple[UploadedFilesMetric, List[Dict[str, Any]]]:
def _upload_chunks(
self,
delta_chunks,
chunks_max_progress: float,
conn_polling: Callable[[], bool],
) -> Tuple[UploadedFilesMetric, List[Dict[str, Any]]]:
"""Upload small files grouped into chunks to save on latency and requests costs"""
chunk_files = self.chunk_uploader.create_and_upload_chunks(
chunks=delta_chunks,
data_file_format=self.data_file_format,
conn_polling=conn_polling,
temp_base_dir=self.compressed_base,
file_type=FileType.Basebackup_delta_chunk,
chunks_max_progress=chunks_max_progress,
Expand Down Expand Up @@ -426,7 +443,10 @@ def _read_delta_sizes(self, snapshot_result: SnapshotResult) -> Tuple[UploadedFi
return digests_metric, embed_metric

def run(
self, pgdata: str, src_iterate_func: Callable[[], Iterable[BackupPath]]
self,
pgdata: str,
src_iterate_func: Callable[[], Iterable[BackupPath]],
conn_polling: Callable[[], bool],
) -> Tuple[int, int, BackupManifest, int, List[Dict[str, Any]]]:
# NOTE: Hard links work only in the same FS, therefore using hopefully the same FS in PG home folder
delta_dir = os.path.join(os.path.dirname(pgdata), "basebackup_delta")
Expand Down Expand Up @@ -459,7 +479,11 @@ def run(
sum(len(chunk) for chunk in delta_chunks)
)
chunks_max_progress = delta_chunks_count * 100.0 / (delta_chunks_count + len(todo_hexdigests))
chunks_metric, chunk_files = self._upload_chunks(delta_chunks, chunks_max_progress=chunks_max_progress)
chunks_metric, chunk_files = self._upload_chunks(
delta_chunks,
chunks_max_progress=chunks_max_progress,
conn_polling=conn_polling,
)

self.log.info(
"Submitting hashes for upload: %r, total hashes in the snapshot: %r", len(todo_hexdigests),
Expand Down
14 changes: 13 additions & 1 deletion pghoard/pgutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
Copyright (c) 2015 Ohmu Ltd
See LICENSE for details
"""

from urllib.parse import parse_qs, urlparse

from psycopg2.extensions import (TRANSACTION_STATUS_ACTIVE, TRANSACTION_STATUS_IDLE, TRANSACTION_STATUS_INTRANS)


def create_connection_string(connection_info):
return " ".join("{}='{}'".format(k, str(v).replace("'", "\\'")) for k, v in sorted(connection_info.items()))
Expand Down Expand Up @@ -92,3 +93,14 @@ def parse_connection_string_libpq(connection_string):
value, connection_string = rem, ""
fields[key] = value
return fields


def check_if_pg_connection_is_alive(db_conn) -> bool:
kathia-barahona marked this conversation as resolved.
Show resolved Hide resolved
if db_conn.closed:
return False

status = db_conn.get_transaction_status()
if status not in [TRANSACTION_STATUS_ACTIVE, TRANSACTION_STATUS_IDLE, TRANSACTION_STATUS_INTRANS]:
return False
kathia-barahona marked this conversation as resolved.
Show resolved Hide resolved

return True
30 changes: 28 additions & 2 deletions test/basebackup/test_basebackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_find_files(self, db):
def create_test_files():
# Create two temporary files on top level and one in global/ that we'll unlink while iterating
with open(top1, "w") as t1, open(top2, "w") as t2, \
open(sub1, "w") as s1, open(sub2, "w") as s2, open(sub3, "w") as s3:
open(sub1, "w") as s1, open(sub2, "w") as s2, open(sub3, "w") as s3:
t1.write("t1\n")
t2.write("t2\n")
s1.write("s1\n")
Expand Down Expand Up @@ -932,7 +932,7 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d
return meta, b"some content"

with patch.object(pgb, "get_remote_basebackups_info") as mock_get_remote_basebackups_info, \
patch("pghoard.basebackup.base.download_backup_meta_file", new=fake_download_backup_meta_file):
patch("pghoard.basebackup.base.download_backup_meta_file", new=fake_download_backup_meta_file):
mock_get_remote_basebackups_info.return_value = [{
"name": f"backup{idx}",
"metadata": {
Expand All @@ -946,3 +946,29 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d
"7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e27": 8192,
"7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e28": 800
}

@pytest.mark.parametrize(
"backup_mode",
[BaseBackupMode.local_tar, BaseBackupMode.delta, BaseBackupMode.local_tar_delta_stats],
)
def test_create_basebackup_lost_pg_connection(self, db, pghoard, backup_mode: BaseBackupMode):
with patch("pghoard.basebackup.base.check_if_pg_connection_is_alive", return_value=False):
pghoard.create_backup_site_paths(pghoard.test_site)
basebackup_path = os.path.join(pghoard.config["backup_location"], pghoard.test_site, "basebackup")
q: Queue[CallbackEvent] = Queue()

pghoard.config["backup_sites"][pghoard.test_site]["basebackup_mode"] = backup_mode
pghoard.config["backup_sites"][pghoard.test_site]["active_backup_mode"] = "archive_command"

now = datetime.datetime.now(datetime.timezone.utc)
metadata = {
"backup-reason": BackupReason.scheduled,
"backup-decision-time": now.isoformat(),
"normalized-backup-time": now.isoformat(),
}
pghoard.create_basebackup(pghoard.test_site, db.user, basebackup_path, q, metadata)
result = q.get(timeout=60)

assert result.success is False
assert result.exception and isinstance(result.exception, RuntimeError)
assert result.exception.args[0] == "ERROR: PostgreSQL connection was lost during backup process."
Loading