From 52fb34c701786df69659e158ceb445b634f200ba Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Wed, 21 Aug 2024 18:04:40 +0200 Subject: [PATCH] WIP: fix duplicated schemas on rapid elections while continuous produce of records --- karapace/config.py | 2 + karapace/coordinator/master_coordinator.py | 54 ++++++++++++--- karapace/coordinator/schema_coordinator.py | 72 ++++++++++++++++++-- tests/integration/test_schema_coordinator.py | 36 ++++++++-- 4 files changed, 143 insertions(+), 21 deletions(-) diff --git a/karapace/config.py b/karapace/config.py index 9212f6348..5f5c7f735 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -78,6 +78,7 @@ class Config(TypedDict): name_strategy_validation: bool master_election_strategy: str protobuf_runtime_directory: str + waiting_time_before_acting_as_master_ms: int sentry: NotRequired[Mapping[str, object]] tags: NotRequired[Mapping[str, object]] @@ -150,6 +151,7 @@ class ConfigDefaults(Config, total=False): "name_strategy_validation": True, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", + "waiting_time_before_acting_as_master_ms": 5000, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/coordinator/master_coordinator.py b/karapace/coordinator/master_coordinator.py index b9fdb3a12..992162bff 100644 --- a/karapace/coordinator/master_coordinator.py +++ b/karapace/coordinator/master_coordinator.py @@ -13,6 +13,7 @@ from karapace.config import Config from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS +from threading import Thread from typing import Final import asyncio @@ -24,7 +25,14 @@ class MasterCoordinator: - """Handles primary election""" + """Handles primary election + + The coordination is run in own dedicated thread, under stress situation the main + eventloop could have queue of items to work and having own thread will give more + runtime for the coordination tasks as Python intrepreter will switch the active + thread by the configured thread switch interval. Default interval in CPython is + 5 milliseconds. + """ def __init__(self, config: Config) -> None: super().__init__() @@ -32,6 +40,8 @@ def __init__(self, config: Config) -> None: self._kafka_client: AIOKafkaClient | None = None self._running = True self._sc: SchemaCoordinator | None = None + self._thread: Thread = Thread(target=self._start_loop, daemon=True) + self._loop: asyncio.AbstractEventLoop | None = None @property def schema_coordinator(self) -> SchemaCoordinator | None: @@ -42,6 +52,17 @@ def config(self) -> Config: return self._config async def start(self) -> None: + self._thread.start() + + def _start_loop(self) -> None: + # we should avoid the reassignment otherwise we leak resources + assert self._loop is None, "Loop already started" + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + self._loop.create_task(self._async_loop()) + self._loop.run_forever() + + async def _async_loop(self) -> None: self._kafka_client = self.init_kafka_client() # Wait until schema coordinator is ready. # This probably needs better synchronization than plain waits. @@ -61,11 +82,19 @@ async def start(self) -> None: await asyncio.sleep(0.5) self._sc = self.init_schema_coordinator() - while True: + while self._running: if self._sc.ready(): return await asyncio.sleep(0.5) + LOG.info("Closing master_coordinator") + if self._sc: + await self._sc.close() + if self._loop: + self._loop.close() + if self._kafka_client: + await self._kafka_client.close() + def init_kafka_client(self) -> AIOKafkaClient: ssl_context = create_ssl_context( cafile=self._config["ssl_cafile"], @@ -99,6 +128,7 @@ def init_schema_coordinator(self) -> SchemaCoordinator: port=self._config["advertised_port"], scheme=self._config["advertised_protocol"], session_timeout_ms=self._config["session_timeout_ms"], + waiting_time_before_acting_as_master_ms=self._config["waiting_time_before_acting_as_master_ms"], ) schema_coordinator.start() return schema_coordinator @@ -107,7 +137,7 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus: assert self._sc is not None generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID return SchemaCoordinatorStatus( - is_primary=self._sc.are_we_master if self._sc is not None else None, + is_primary=self._sc.are_we_master() if self._sc is not None else None, is_primary_eligible=self._config["master_eligibility"], primary_url=self._sc.master_url if self._sc is not None else None, is_running=True, @@ -116,12 +146,16 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus: def get_master_info(self) -> tuple[bool | None, str | None]: """Return whether we're the master, and the actual master url that can be used if we're not""" - assert self._sc is not None - return self._sc.are_we_master, self._sc.master_url + if not self._sc: + return False, None + + if not self._sc.ready(): + # we should wait for a while after we have been elected master, we should also consume + # all the messages in the log before proceeding, check the doc of `self._sc.are_we_master` + # for more details + return False, None + + return self._sc.are_we_master(), self._sc.master_url async def close(self) -> None: - LOG.info("Closing master_coordinator") - if self._sc: - await self._sc.close() - if self._kafka_client: - await self._kafka_client.close() + self._running = False diff --git a/karapace/coordinator/schema_coordinator.py b/karapace/coordinator/schema_coordinator.py index ade69be91..a161f410b 100644 --- a/karapace/coordinator/schema_coordinator.py +++ b/karapace/coordinator/schema_coordinator.py @@ -117,7 +117,6 @@ class SchemaCoordinator: Contains original comments and also Schema Registry specific comments. """ - are_we_master: bool | None = None master_url: str | None = None def __init__( @@ -134,6 +133,7 @@ def __init__( rebalance_timeout_ms: int = 30000, retry_backoff_ms: int = 100, session_timeout_ms: int = 10000, + waiting_time_before_acting_as_master_ms: int = 5000, ) -> None: # Coordination flags and futures self._client: Final = client @@ -146,7 +146,16 @@ def __init__( self.scheme: Final = scheme self.master_eligibility: Final = master_eligibility self.master_url: str | None = None - self.are_we_master = False + self._are_we_master: bool | None = False + # a value that its strictly higher than any clock, so we are sure + # we are never going to consider this the leader without explictly passing + # from False to True for the `_are_we_master` variable. + self._initial_election_sec: float | None = float("infinity") + # used to understand if I need to wait the `waiting_time_before_acting_as_master_ms` + # before acting as a leader or not, if the last time I was leader was less than 5 seconds + # ago I can skip the waiting phase (note that I'm always using my own time, no problems due + # to skew of clocks between machines). + self._last_time_i_was_leader: float = float("-infinity") self.rejoin_needed_fut: asyncio.Future[None] | None = None self._coordinator_dead_fut: asyncio.Future[None] | None = None @@ -162,6 +171,7 @@ def __init__( self._rebalance_timeout_ms: Final = rebalance_timeout_ms self._retry_backoff_ms: Final = retry_backoff_ms self._session_timeout_ms: Final = session_timeout_ms + self._waiting_time_before_acting_as_master_ms: Final = waiting_time_before_acting_as_master_ms self._coordinator_lookup_lock: Final = asyncio.Lock() self._coordination_task: asyncio.Future[None] | None = None @@ -181,6 +191,38 @@ def __init__( self._metadata_snapshot: list[Assignment] = [] + def are_we_master(self) -> bool | None: + """ + After a new election its made we should wait for a while since the previous master could have produced + a new message shortly before being disconnected from the cluster. + If this is true the max id selected for the next schema ID, so we can create + two schemas with the same id (or even more if rapid elections are one after another). + The fix its to wait for ~= 5 seconds if new messages arrives before becoming available as a master. + The condition to resume being the master its: + no new messages are still to be processed + at least 5 seconds have passed since we were elected master + """ + if self._are_we_master is None: + # `self._are_we_master` is `None` only during the perform of the assignment + # where we don't know if we are master yet (probably we should return false due to the + # new logic of waiting for a while before enabling the writes), todo: check me later. + return None + + if not self._ready: + return False + + if self._are_we_master and self._initial_election_sec is not None: + # todo create a parameter for the time to wait before becoming master + # `time.monotonic()` because we don't want the time to go back or forward because of + # e.g. ntp + if time.monotonic() > self._initial_election_sec + (self._waiting_time_before_acting_as_master_ms / 1000): + # set the value to `None` since its expensive to call each time the monotonic clock. + self._initial_election_sec = None + return True + + return False + + return self._are_we_master + def start(self) -> None: """Must be called after creating SchemaCoordinator object to initialize futures and start the coordination task. @@ -280,6 +322,10 @@ async def _maybe_leave_group(self) -> None: LOG.warning("LeaveGroup request failed: %s", err) else: LOG.info("LeaveGroup request succeeded") + # to avoid sleeping if we were the master, a new actor join the cluster + # and we are immediately elected as leader again. + if self.are_we_master(): + self._last_time_i_was_leader = time.monotonic() self.reset_generation() def _handle_metadata_update(self, _: ClusterMetadata) -> None: @@ -348,7 +394,7 @@ async def perform_assignment( response_data.protocol, response_data.members, ) - self.are_we_master = None + self._are_we_master = None error = NO_ERROR urls = {} fallback_urls = {} @@ -416,13 +462,25 @@ async def _on_join_complete( # On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real if member_assignment["master"] == member_id and member_identity["master_eligibility"]: self.master_url = master_url - self.are_we_master = True + self._are_we_master = True + ive_never_been_a_master = self._last_time_i_was_leader == float("-inf") + another_master_could_have_been_elected = ( + self._last_time_i_was_leader + (self._waiting_time_before_acting_as_master_ms / 1000) < time.monotonic() + ) + if ive_never_been_a_master or another_master_could_have_been_elected: + # we need to wait late record arrivals only in the case there + # was a master change, the time before acting its always respect + # to which was the previous master (if we were master no need + # to wait more before acting) + self._ready = False + # `time.monotonic()` because we don't want the time to go back or forward because of e.g. ntp + self._initial_election_sec = time.monotonic() elif not member_identity["master_eligibility"]: self.master_url = None - self.are_we_master = False + self._are_we_master = False else: self.master_url = master_url - self.are_we_master = False + self._are_we_master = False self._ready = True return None @@ -513,6 +571,8 @@ async def __coordination_routine(self) -> None: try: await self.ensure_coordinator_known() if self.need_rejoin(): + if self.are_we_master(): + self._last_time_i_was_leader = time.monotonic() new_assignment = await self.ensure_active_group() if not new_assignment: continue diff --git a/tests/integration/test_schema_coordinator.py b/tests/integration/test_schema_coordinator.py index 7c7697065..5ff588fa0 100644 --- a/tests/integration/test_schema_coordinator.py +++ b/tests/integration/test_schema_coordinator.py @@ -30,6 +30,7 @@ import contextlib import logging import pytest +import time UNKNOWN_MEMBER_ID = JoinGroupRequest.UNKNOWN_MEMBER_ID @@ -91,6 +92,8 @@ async def test_coordinator_workflow( # Check if 2 coordinators will coordinate rebalances correctly # Check if the initial group join is performed correctly with minimal # setup + + waiting_time_before_acting_as_master_sec = 5 coordinator = SchemaCoordinator( client, "test-host-1", @@ -102,6 +105,7 @@ async def test_coordinator_workflow( session_timeout_ms=10000, heartbeat_interval_ms=500, retry_backoff_ms=100, + waiting_time_before_acting_as_master_ms=waiting_time_before_acting_as_master_sec * 1000, ) coordinator.start() assert coordinator.coordinator_id is None @@ -112,7 +116,10 @@ async def test_coordinator_workflow( await coordinator.ensure_coordinator_known() assert coordinator.coordinator_id is not None - assert coordinator.are_we_master + assert not coordinator.are_we_master() + # the waiting_time_before_acting_as_master_ms + await asyncio.sleep(10) + assert coordinator.are_we_master(), f"after {waiting_time_before_acting_as_master_sec} seconds we can act as a master" # Check if adding an additional coordinator will rebalance correctly client2 = await _get_client(kafka_servers=kafka_servers) @@ -144,16 +151,35 @@ async def test_coordinator_workflow( secondary = coordinator if primary_selection_strategy == "highest" else coordinator2 secondary_client = client if primary_selection_strategy == "highest" else client2 - assert primary.are_we_master - assert not secondary.are_we_master + if primary == coordinator2: + # we need to disable the master for `waiting_time_before_acting_as_master_sec` seconds each time, we cannot be sure. + # if the coordinator its `coordinator1` since isn't changed we don't have to wait + # for the `waiting_time_before_acting_as_master_sec` seconds. + assert ( + not primary.are_we_master() + ), "after a change in the coordinator we can act as a master until we wait for the required time" + assert not secondary.are_we_master(), "also the second cannot be immediately a master" + # after that time the primary can act as a master + await asyncio.sleep(waiting_time_before_acting_as_master_sec) + + assert primary.are_we_master() + assert not secondary.are_we_master() # Check is closing the primary coordinator will rebalance the secondary to change to primary await primary.close() await primary_client.close() - while not secondary.are_we_master: + now = time.time() + while time.time() - now < waiting_time_before_acting_as_master_sec: + assert ( + not secondary.are_we_master() + , (f"Cannot become master before {waiting_time_before_acting_as_master_sec} seconds " + f"for the late records that can arrive from the previous master") + ) + await asyncio.sleep(0.5) + + while not secondary.are_we_master(): await asyncio.sleep(0.5) - assert secondary.are_we_master await secondary.close() await secondary_client.close()