Skip to content

Commit

Permalink
WIP:
Browse files Browse the repository at this point in the history
1. now the schema_coordinator wait until the schema_reader reached the last message in the topic before declaring himself as master
2. changed the `_ready` flag of the schema_reader to be protected by a lock since now also the schema_coordinator can reset the `_ready` flag
3. set the close of the coordinator in the `close` method instead of in the run method
  • Loading branch information
eliax1996 committed Sep 18, 2024
1 parent 3427f8d commit 3f84dff
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 34 deletions.
21 changes: 17 additions & 4 deletions karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import SchemaReaderStoppper
from threading import Thread
from typing import Final

import asyncio
import logging
import time

__all__ = ("MasterCoordinator",)


LOG = logging.getLogger(__name__)


Expand All @@ -42,6 +45,10 @@ def __init__(self, config: Config) -> None:
self._sc: SchemaCoordinator | None = None
self._thread: Thread = Thread(target=self._start_loop, daemon=True)
self._loop: asyncio.AbstractEventLoop | None = None
self._schema_reader_stopper: SchemaReaderStoppper | None = None

def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None:
self._schema_reader_stopper = schema_reader_stopper

@property
def schema_coordinator(self) -> SchemaCoordinator | None:
Expand Down Expand Up @@ -84,14 +91,17 @@ async def _async_loop(self) -> None:
self._sc = self.init_schema_coordinator()
while self._running:
if self._sc.ready():
return
break
await asyncio.sleep(0.5)

# todo: wait a condition variable or a lock.
LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
if self._loop:
self._loop.close()
while self._loop is not None and not self._loop.is_closed():
self._loop.stop()
if not self._loop.is_running():
self._loop.close()
time.sleep(0.5)
if self._kafka_client:
await self._kafka_client.close()

Expand Down Expand Up @@ -119,8 +129,10 @@ def init_kafka_client(self) -> AIOKafkaClient:

def init_schema_coordinator(self) -> SchemaCoordinator:
assert self._kafka_client is not None
assert self._schema_reader_stopper is not None
schema_coordinator = SchemaCoordinator(
client=self._kafka_client,
schema_reader_stopper=self._schema_reader_stopper,
election_strategy=self._config.get("master_election_strategy", "lowest"),
group_id=self._config["group_id"],
hostname=self._config["advertised_hostname"],
Expand Down Expand Up @@ -159,3 +171,4 @@ def get_master_info(self) -> tuple[bool | None, str | None]:

async def close(self) -> None:
self._running = False
# todo set the condition variable or lock.
14 changes: 9 additions & 5 deletions karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
)
from aiokafka.util import create_future, create_task
from karapace.dataclasses import default_dataclass
from karapace.typing import JsonData
from karapace.typing import JsonData, SchemaReaderStoppper
from karapace.utils import json_decode, json_encode
from karapace.version import __version__
from typing import Any, Coroutine, Final, Sequence
Expand Down Expand Up @@ -122,6 +122,7 @@ class SchemaCoordinator:
def __init__(
self,
client: AIOKafkaClient,
schema_reader_stopper: SchemaReaderStoppper,
hostname: str,
port: int,
scheme: str,
Expand All @@ -146,6 +147,7 @@ def __init__(
self.scheme: Final = scheme
self.master_eligibility: Final = master_eligibility
self.master_url: str | None = None
self._schema_reader_stopper = schema_reader_stopper
self._are_we_master: bool | None = False
# a value that its strictly higher than any clock, so we are sure
# we are never going to consider this the leader without explictly passing
Expand Down Expand Up @@ -211,7 +213,7 @@ def are_we_master(self) -> bool | None:
LOG.warning("No new elections performed yet.")
return None

if not self._ready:
if not self._ready or not self._schema_reader_stopper.ready():
return False

if self._are_we_master and self._initial_election_sec is not None:
Expand All @@ -223,7 +225,7 @@ def are_we_master(self) -> bool | None:
self._initial_election_sec = None
# this is the last point in time were we wait till to the end of the log queue for new
# incoming messages.
self._ready = False # todo: wrong, its not the _ready flag we should change, we should change the same
self._schema_reader_stopper.set_not_ready()
# flag that its set at startup, fix this
return False

Expand Down Expand Up @@ -484,7 +486,7 @@ async def _on_join_complete(
# was a master change, the time before acting its always respect
# to which was the previous master (if we were master no need
# to wait more before acting)
self._ready = False # todo: wrong, its not the _ready flag we should change, we should change the same
self._schema_reader_stopper.set_not_ready()
# flag that its set at startup, fix this
# `time.monotonic()` because we don't want the time to go back or forward because of e.g. ntp
self._initial_election_sec = time.monotonic()
Expand All @@ -505,7 +507,7 @@ async def _on_join_complete(
self.master_url = None
self._are_we_master = False
else:
LOG.info("We are not elected as master", member_id)
LOG.info("We are not elected as master")
self.master_url = master_url
self._are_we_master = False
self._ready = True
Expand All @@ -518,13 +520,15 @@ def coordinator_dead(self) -> None:
"""
if self._coordinator_dead_fut is not None and self.coordinator_id is not None:
LOG.warning("Marking the coordinator dead (node %s)for group %s.", self.coordinator_id, self.group_id)
self._are_we_master = False
self.coordinator_id = None
self._coordinator_dead_fut.set_result(None)

def reset_generation(self) -> None:
"""Coordinator did not recognize either generation or member_id. Will
need to re-join the group.
"""
self._are_we_master = False
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.request_rejoin()
Expand Down
55 changes: 40 additions & 15 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, SchemaId, Subject, Version
from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version
from karapace.utils import json_decode, JSONDecodeError
from threading import Event, Thread
from threading import Event, Lock, Thread
from typing import Final, Mapping, Sequence

import json
Expand Down Expand Up @@ -119,7 +119,7 @@ def _create_admin_client_from_config(config: Config) -> KafkaAdminClient:
)


class KafkaSchemaReader(Thread):
class KafkaSchemaReader(Thread, SchemaReaderStoppper):
def __init__(
self,
config: Config,
Expand Down Expand Up @@ -156,7 +156,10 @@ def __init__(
# old stale version that has not been deleted yet.)
self.offset = OFFSET_UNINITIALIZED
self._highest_offset = OFFSET_UNINITIALIZED
self.ready = False
# when a master its elected as master we should read the last arrived messages at least
# once. This lock prevent the concurrent modification of the `ready` flag.
self._ready_lock = Lock()
self._ready = False

# This event controls when the Reader should stop running, it will be
# set by another thread (e.g. `KarapaceSchemaRegistry`)
Expand Down Expand Up @@ -269,9 +272,10 @@ def _get_beginning_offset(self) -> int:
return OFFSET_UNINITIALIZED

def _is_ready(self) -> bool:
if self.ready:
return True

"""
Always call `_is_ready` only if `self._ready` is False.
Removed the check since now with the Lock the lookup it's a costly operation.
"""
assert self.consumer is not None, "Thread must be started"

try:
Expand Down Expand Up @@ -315,6 +319,14 @@ def _is_ready(self) -> bool:
def highest_offset(self) -> int:
return max(self._highest_offset, self._offset_watcher.greatest_offset())

def ready(self) -> bool:
with self._ready_lock:
return self._ready

def set_not_ready(self) -> None:
with self._ready_lock:
self._ready = False

@staticmethod
def _parse_message_value(raw_value: str | bytes) -> JsonObject | None:
value = json_decode(raw_value)
Expand All @@ -326,10 +338,8 @@ def _parse_message_value(raw_value: str | bytes) -> JsonObject | None:

def handle_messages(self) -> None:
assert self.consumer is not None, "Thread must be started"

msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s, num_messages=self.max_messages_to_process)
if self.ready is False:
self.ready = self._is_ready()
self._update_is_ready_flag()

watch_offsets = False
if self.master_coordinator is not None:
Expand Down Expand Up @@ -372,9 +382,10 @@ def handle_messages(self) -> None:
# Default keymode is CANONICAL and preferred unless any data consumed
# has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE
# the subsequent keys are omitted from detection.
if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL:
if msg_keymode == KeyMode.DEPRECATED_KARAPACE:
self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE)
with self._ready_lock:
if not self._ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL:
if msg_keymode == KeyMode.DEPRECATED_KARAPACE:
self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE)

value = None
message_value = msg.value()
Expand All @@ -395,14 +406,28 @@ def handle_messages(self) -> None:
else:
schema_records_processed_keymode_deprecated_karapace += 1

if self.ready and watch_offsets:
self._offset_watcher.offset_seen(self.offset)
with self._ready_lock:
if self._ready and watch_offsets:
self._offset_watcher.offset_seen(self.offset)

self._report_schema_metrics(
schema_records_processed_keymode_canonical,
schema_records_processed_keymode_deprecated_karapace,
)

def _update_is_ready_flag(self) -> None:
update_ready_flag = False

# to keep the lock as few as possible.
with self._ready_lock:
if self._ready is False:
update_ready_flag = True

if update_ready_flag:
new_ready_flag = self._is_ready()
with self._ready_lock:
self._ready = new_ready_flag

def _report_schema_metrics(
self,
schema_records_processed_keymode_canonical: int,
Expand Down
9 changes: 8 additions & 1 deletion karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ def __init__(self, config: Config) -> None:
master_coordinator=self.mc,
database=self.database,
)
# very ugly, left as a placeholder, since we have a bidirectional
# dependency it means that the two objects needs to be one (aka the
# mc should create the KafkaSchemaReader and inject the stopper inside
# the schema_coordinator. Left as it is to reason together to the implementation
# since semantically it's the same, after we agree on the solution proceeding with
# the refactor)
self.mc.set_stoppper(self.schema_reader)

self.schema_lock = asyncio.Lock()
self._master_lock = asyncio.Lock()
Expand Down Expand Up @@ -94,7 +101,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str |
are_we_master, master_url = self.mc.get_master_info()
if are_we_master is None:
LOG.info("No master set: %r, url: %r", are_we_master, master_url)
elif not ignore_readiness and self.schema_reader.ready is False:
elif not ignore_readiness and self.schema_reader.ready() is False:
LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready)
else:
return are_we_master, master_url
Expand Down
5 changes: 3 additions & 2 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class SchemaErrorMessages(Enum):

class KarapaceSchemaRegistryController(KarapaceBase):
def __init__(self, config: Config) -> None:
# the `not_ready_handler` its wrong, its not expecting an async method the receiver.
super().__init__(config=config, not_ready_handler=self._forward_if_not_ready_to_serve)

self._auth: HTTPAuthorizer | None = None
Expand All @@ -103,7 +104,7 @@ async def schema_registry_health(self) -> JsonObject:
if self._auth is not None:
resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified
resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready
if self.schema_registry.schema_reader.ready:
if self.schema_registry.schema_reader.ready():
resp["schema_registry_startup_time_sec"] = (
self.schema_registry.schema_reader.last_check - self._process_start_time
)
Expand Down Expand Up @@ -135,7 +136,7 @@ def _check_authorization(self, user: User | None, operation: Operation, resource
self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN)

async def _forward_if_not_ready_to_serve(self, request: HTTPRequest) -> None:
if self.schema_registry.schema_reader.ready:
if self.schema_registry.schema_reader.ready():
pass
else:
# Not ready, still loading the state.
Expand Down
11 changes: 11 additions & 0 deletions karapace/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from enum import Enum, unique
from karapace.errors import InvalidVersion
from typing import Any, ClassVar, Dict, List, Mapping, NewType, Sequence, Union
Expand Down Expand Up @@ -101,3 +102,13 @@ def value(self) -> int:
@property
def is_latest(self) -> bool:
return self.value == self.MINUS_1_VERSION_TAG


class SchemaReaderStoppper(ABC):
@abstractmethod
def ready(self) -> bool:
pass

@abstractmethod
def set_not_ready(self) -> None:
pass
10 changes: 10 additions & 0 deletions tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
from karapace.config import set_config_defaults
from karapace.coordinator.master_coordinator import MasterCoordinator
from karapace.typing import SchemaReaderStoppper
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import PortRangeInclusive
from tests.utils import new_random_name
Expand All @@ -16,8 +17,17 @@
import requests


class AlwaysAvailableSchemaReaderStoppper(SchemaReaderStoppper):
def ready(self) -> bool:
return True

def set_not_ready(self) -> None:
pass


async def init_admin(config):
mc = MasterCoordinator(config=config)
mc.set_stoppper(AlwaysAvailableSchemaReaderStoppper())
await mc.start()
return mc

Expand Down
Loading

0 comments on commit 3f84dff

Please sign in to comment.