Skip to content

Commit

Permalink
WIP: fix duplicated schemas on rapid elections while continuous produ…
Browse files Browse the repository at this point in the history
…ce of records
  • Loading branch information
eliax1996 committed Aug 22, 2024
1 parent 99c1046 commit 52fb34c
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 21 deletions.
2 changes: 2 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class Config(TypedDict):
name_strategy_validation: bool
master_election_strategy: str
protobuf_runtime_directory: str
waiting_time_before_acting_as_master_ms: int

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -150,6 +151,7 @@ class ConfigDefaults(Config, total=False):
"name_strategy_validation": True,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
"waiting_time_before_acting_as_master_ms": 5000,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
54 changes: 44 additions & 10 deletions karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from threading import Thread
from typing import Final

import asyncio
Expand All @@ -24,14 +25,23 @@


class MasterCoordinator:
"""Handles primary election"""
"""Handles primary election
The coordination is run in own dedicated thread, under stress situation the main
eventloop could have queue of items to work and having own thread will give more
runtime for the coordination tasks as Python intrepreter will switch the active
thread by the configured thread switch interval. Default interval in CPython is
5 milliseconds.
"""

def __init__(self, config: Config) -> None:
super().__init__()
self._config: Final = config
self._kafka_client: AIOKafkaClient | None = None
self._running = True
self._sc: SchemaCoordinator | None = None
self._thread: Thread = Thread(target=self._start_loop, daemon=True)
self._loop: asyncio.AbstractEventLoop | None = None

@property
def schema_coordinator(self) -> SchemaCoordinator | None:
Expand All @@ -42,6 +52,17 @@ def config(self) -> Config:
return self._config

async def start(self) -> None:
self._thread.start()

def _start_loop(self) -> None:
# we should avoid the reassignment otherwise we leak resources
assert self._loop is None, "Loop already started"
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._async_loop())
self._loop.run_forever()

async def _async_loop(self) -> None:
self._kafka_client = self.init_kafka_client()
# Wait until schema coordinator is ready.
# This probably needs better synchronization than plain waits.
Expand All @@ -61,11 +82,19 @@ async def start(self) -> None:
await asyncio.sleep(0.5)

self._sc = self.init_schema_coordinator()
while True:
while self._running:
if self._sc.ready():
return
await asyncio.sleep(0.5)

LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
if self._loop:
self._loop.close()
if self._kafka_client:
await self._kafka_client.close()

def init_kafka_client(self) -> AIOKafkaClient:
ssl_context = create_ssl_context(
cafile=self._config["ssl_cafile"],
Expand Down Expand Up @@ -99,6 +128,7 @@ def init_schema_coordinator(self) -> SchemaCoordinator:
port=self._config["advertised_port"],
scheme=self._config["advertised_protocol"],
session_timeout_ms=self._config["session_timeout_ms"],
waiting_time_before_acting_as_master_ms=self._config["waiting_time_before_acting_as_master_ms"],
)
schema_coordinator.start()
return schema_coordinator
Expand All @@ -107,7 +137,7 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:
assert self._sc is not None
generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID
return SchemaCoordinatorStatus(
is_primary=self._sc.are_we_master if self._sc is not None else None,
is_primary=self._sc.are_we_master() if self._sc is not None else None,
is_primary_eligible=self._config["master_eligibility"],
primary_url=self._sc.master_url if self._sc is not None else None,
is_running=True,
Expand All @@ -116,12 +146,16 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:

def get_master_info(self) -> tuple[bool | None, str | None]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
assert self._sc is not None
return self._sc.are_we_master, self._sc.master_url
if not self._sc:
return False, None

if not self._sc.ready():
# we should wait for a while after we have been elected master, we should also consume
# all the messages in the log before proceeding, check the doc of `self._sc.are_we_master`
# for more details
return False, None

return self._sc.are_we_master(), self._sc.master_url

async def close(self) -> None:
LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
if self._kafka_client:
await self._kafka_client.close()
self._running = False
72 changes: 66 additions & 6 deletions karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class SchemaCoordinator:
Contains original comments and also Schema Registry specific comments.
"""

are_we_master: bool | None = None
master_url: str | None = None

def __init__(
Expand All @@ -134,6 +133,7 @@ def __init__(
rebalance_timeout_ms: int = 30000,
retry_backoff_ms: int = 100,
session_timeout_ms: int = 10000,
waiting_time_before_acting_as_master_ms: int = 5000,
) -> None:
# Coordination flags and futures
self._client: Final = client
Expand All @@ -146,7 +146,16 @@ def __init__(
self.scheme: Final = scheme
self.master_eligibility: Final = master_eligibility
self.master_url: str | None = None
self.are_we_master = False
self._are_we_master: bool | None = False
# a value that its strictly higher than any clock, so we are sure
# we are never going to consider this the leader without explictly passing
# from False to True for the `_are_we_master` variable.
self._initial_election_sec: float | None = float("infinity")
# used to understand if I need to wait the `waiting_time_before_acting_as_master_ms`
# before acting as a leader or not, if the last time I was leader was less than 5 seconds
# ago I can skip the waiting phase (note that I'm always using my own time, no problems due
# to skew of clocks between machines).
self._last_time_i_was_leader: float = float("-infinity")

self.rejoin_needed_fut: asyncio.Future[None] | None = None
self._coordinator_dead_fut: asyncio.Future[None] | None = None
Expand All @@ -162,6 +171,7 @@ def __init__(
self._rebalance_timeout_ms: Final = rebalance_timeout_ms
self._retry_backoff_ms: Final = retry_backoff_ms
self._session_timeout_ms: Final = session_timeout_ms
self._waiting_time_before_acting_as_master_ms: Final = waiting_time_before_acting_as_master_ms

self._coordinator_lookup_lock: Final = asyncio.Lock()
self._coordination_task: asyncio.Future[None] | None = None
Expand All @@ -181,6 +191,38 @@ def __init__(

self._metadata_snapshot: list[Assignment] = []

def are_we_master(self) -> bool | None:
"""
After a new election its made we should wait for a while since the previous master could have produced
a new message shortly before being disconnected from the cluster.
If this is true the max id selected for the next schema ID, so we can create
two schemas with the same id (or even more if rapid elections are one after another).
The fix its to wait for ~= 5 seconds if new messages arrives before becoming available as a master.
The condition to resume being the master its:
no new messages are still to be processed + at least 5 seconds have passed since we were elected master
"""
if self._are_we_master is None:
# `self._are_we_master` is `None` only during the perform of the assignment
# where we don't know if we are master yet (probably we should return false due to the
# new logic of waiting for a while before enabling the writes), todo: check me later.
return None

if not self._ready:
return False

if self._are_we_master and self._initial_election_sec is not None:
# todo create a parameter for the time to wait before becoming master
# `time.monotonic()` because we don't want the time to go back or forward because of
# e.g. ntp
if time.monotonic() > self._initial_election_sec + (self._waiting_time_before_acting_as_master_ms / 1000):
# set the value to `None` since its expensive to call each time the monotonic clock.
self._initial_election_sec = None
return True

return False

return self._are_we_master

def start(self) -> None:
"""Must be called after creating SchemaCoordinator object to initialize
futures and start the coordination task.
Expand Down Expand Up @@ -280,6 +322,10 @@ async def _maybe_leave_group(self) -> None:
LOG.warning("LeaveGroup request failed: %s", err)
else:
LOG.info("LeaveGroup request succeeded")
# to avoid sleeping if we were the master, a new actor join the cluster
# and we are immediately elected as leader again.
if self.are_we_master():
self._last_time_i_was_leader = time.monotonic()
self.reset_generation()

def _handle_metadata_update(self, _: ClusterMetadata) -> None:
Expand Down Expand Up @@ -348,7 +394,7 @@ async def perform_assignment(
response_data.protocol,
response_data.members,
)
self.are_we_master = None
self._are_we_master = None
error = NO_ERROR
urls = {}
fallback_urls = {}
Expand Down Expand Up @@ -416,13 +462,25 @@ async def _on_join_complete(
# On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real
if member_assignment["master"] == member_id and member_identity["master_eligibility"]:
self.master_url = master_url
self.are_we_master = True
self._are_we_master = True
ive_never_been_a_master = self._last_time_i_was_leader == float("-inf")
another_master_could_have_been_elected = (
self._last_time_i_was_leader + (self._waiting_time_before_acting_as_master_ms / 1000) < time.monotonic()
)
if ive_never_been_a_master or another_master_could_have_been_elected:
# we need to wait late record arrivals only in the case there
# was a master change, the time before acting its always respect
# to which was the previous master (if we were master no need
# to wait more before acting)
self._ready = False
# `time.monotonic()` because we don't want the time to go back or forward because of e.g. ntp
self._initial_election_sec = time.monotonic()
elif not member_identity["master_eligibility"]:
self.master_url = None
self.are_we_master = False
self._are_we_master = False
else:
self.master_url = master_url
self.are_we_master = False
self._are_we_master = False
self._ready = True
return None

Expand Down Expand Up @@ -513,6 +571,8 @@ async def __coordination_routine(self) -> None:
try:
await self.ensure_coordinator_known()
if self.need_rejoin():
if self.are_we_master():
self._last_time_i_was_leader = time.monotonic()
new_assignment = await self.ensure_active_group()
if not new_assignment:
continue
Expand Down
36 changes: 31 additions & 5 deletions tests/integration/test_schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import contextlib
import logging
import pytest
import time

UNKNOWN_MEMBER_ID = JoinGroupRequest.UNKNOWN_MEMBER_ID

Expand Down Expand Up @@ -91,6 +92,8 @@ async def test_coordinator_workflow(
# Check if 2 coordinators will coordinate rebalances correctly
# Check if the initial group join is performed correctly with minimal
# setup

waiting_time_before_acting_as_master_sec = 5
coordinator = SchemaCoordinator(
client,
"test-host-1",
Expand All @@ -102,6 +105,7 @@ async def test_coordinator_workflow(
session_timeout_ms=10000,
heartbeat_interval_ms=500,
retry_backoff_ms=100,
waiting_time_before_acting_as_master_ms=waiting_time_before_acting_as_master_sec * 1000,
)
coordinator.start()
assert coordinator.coordinator_id is None
Expand All @@ -112,7 +116,10 @@ async def test_coordinator_workflow(
await coordinator.ensure_coordinator_known()
assert coordinator.coordinator_id is not None

assert coordinator.are_we_master
assert not coordinator.are_we_master()
# the waiting_time_before_acting_as_master_ms
await asyncio.sleep(10)
assert coordinator.are_we_master(), f"after {waiting_time_before_acting_as_master_sec} seconds we can act as a master"

# Check if adding an additional coordinator will rebalance correctly
client2 = await _get_client(kafka_servers=kafka_servers)
Expand Down Expand Up @@ -144,16 +151,35 @@ async def test_coordinator_workflow(
secondary = coordinator if primary_selection_strategy == "highest" else coordinator2
secondary_client = client if primary_selection_strategy == "highest" else client2

assert primary.are_we_master
assert not secondary.are_we_master
if primary == coordinator2:
# we need to disable the master for `waiting_time_before_acting_as_master_sec` seconds each time, we cannot be sure.
# if the coordinator its `coordinator1` since isn't changed we don't have to wait
# for the `waiting_time_before_acting_as_master_sec` seconds.
assert (
not primary.are_we_master()
), "after a change in the coordinator we can act as a master until we wait for the required time"
assert not secondary.are_we_master(), "also the second cannot be immediately a master"
# after that time the primary can act as a master
await asyncio.sleep(waiting_time_before_acting_as_master_sec)

assert primary.are_we_master()
assert not secondary.are_we_master()

# Check is closing the primary coordinator will rebalance the secondary to change to primary
await primary.close()
await primary_client.close()

while not secondary.are_we_master:
now = time.time()
while time.time() - now < waiting_time_before_acting_as_master_sec:
assert (
not secondary.are_we_master()
, (f"Cannot become master before {waiting_time_before_acting_as_master_sec} seconds "
f"for the late records that can arrive from the previous master")
)
await asyncio.sleep(0.5)

while not secondary.are_we_master():
await asyncio.sleep(0.5)
assert secondary.are_we_master
await secondary.close()
await secondary_client.close()

Expand Down

0 comments on commit 52fb34c

Please sign in to comment.