From 72e9031b81b568ec885dfa0f3ceefbb7ec9381e1 Mon Sep 17 00:00:00 2001 From: Daniel Cacabelos Date: Tue, 31 Jan 2023 10:19:55 +0100 Subject: [PATCH 01/12] Tests: Add helper function to wait for CommandHandlers --- integration-tests/helpers/writer.py | 32 ++++++++++++++--------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/integration-tests/helpers/writer.py b/integration-tests/helpers/writer.py index ebcb85df6..739ab22aa 100644 --- a/integration-tests/helpers/writer.py +++ b/integration-tests/helpers/writer.py @@ -1,6 +1,7 @@ from file_writer_control import WorkerJobPool from file_writer_control.WorkerStatus import WorkerState from file_writer_control import JobState +from file_writer_control import CommandHandler from file_writer_control import CommandState from file_writer_control import WriteJob from file_writer_control import JobHandler @@ -43,37 +44,34 @@ def wait_no_working_writers(worker_pool: WorkerJobPool, timeout: float): raise RuntimeError("Timed out when waiting for workers to finish") -def wait_start_job(worker_pool: WorkerJobPool, write_job: WriteJob, timeout: float): - job_handler = JobHandler(worker_finder=worker_pool) - start_handler = job_handler.start_job(write_job) +def wait_command_is_done(command_handler: CommandHandler, timeout: float): start_time = datetime.now() try: - while not start_handler.is_done(): + while not command_handler.is_done(): if start_time + timedelta(seconds=timeout) < datetime.now(): - raise RuntimeError("Timed out when waiting for write job to start") - elif start_handler.get_state() == CommandState.ERROR: + raise RuntimeError("Timed out when waiting for command to finish") + elif command_handler.get_state() == CommandState.ERROR: raise RuntimeError( - f"Got error when trying to start job. Message was: {start_handler.get_message()}" + f"Command failed. Message was: {command_handler.get_message()}" ) time.sleep(0.5) except RuntimeError as e: raise RuntimeError( - e.__str__() + f" The message was: {start_handler.get_message()}" + e.__str__() + f" The message was: {command_handler.get_message()}" ) + + +def wait_start_job(worker_pool: WorkerJobPool, write_job: WriteJob, timeout: float): + job_handler = JobHandler(worker_finder=worker_pool) + start_handler = job_handler.start_job(write_job) + start_time = datetime.now() + wait_command_is_done(start_handler, timeout) return job_handler def wait_set_stop_now(job: JobHandler, timeout: float): stop_handler = job.set_stop_time(datetime.now()) - start_time = datetime.now() - while not stop_handler.is_done(): - if start_time + timedelta(seconds=timeout) < datetime.now(): - raise RuntimeError("Timed out when setting new stop time for job.") - elif stop_handler.get_state() == CommandState.ERROR: - raise RuntimeError( - f"Got error when trying to stop job. Message was: {stop_handler.get_message()}" - ) - time.sleep(0.5) + wait_command_is_done(stop_handler, timeout) def wait_fail_start_job( From 3b681a81bcc67a5e9dc6a2207e8e02e944754631 Mon Sep 17 00:00:00 2001 From: Daniel Cacabelos Date: Wed, 8 Feb 2023 11:07:23 +0100 Subject: [PATCH 02/12] Add support to process command topic from a specific timestamp --- changes.md | 20 ++++-- docker_launch.sh | 2 +- integration-tests/conftest.py | 2 +- integration-tests/docker-compose.yml | 2 +- ...st_stop_msg_immediately_after_start_msg.py | 47 ++++++++++++++ src/CommandSystem/CommandListener.cpp | 18 +++++- src/CommandSystem/CommandListener.h | 20 ++++-- src/CommandSystem/Handler.cpp | 7 ++- src/Kafka/Consumer.cpp | 62 ++++++++++++++++++- src/Kafka/Consumer.h | 39 ++++++++++-- src/Kafka/MetaDataQuery.cpp | 6 ++ src/Kafka/MetaDataQuery.h | 4 ++ src/Kafka/MetaDataQueryImpl.h | 8 +-- src/tests/helpers/KafkaMocks.h | 2 + 14 files changed, 212 insertions(+), 27 deletions(-) create mode 100644 integration-tests/test_stop_msg_immediately_after_start_msg.py diff --git a/changes.md b/changes.md index 23a2ccdbc..bbbf6c691 100644 --- a/changes.md +++ b/changes.md @@ -2,17 +2,27 @@ ## Next version -- Redact Kafka SASL password in logs. -- Ignore deprecated warnings on macOS (can be removed when https://github.com/chriskohlhoff/asio/issues/1183 is addressed. +- + +## Version 6.0.0 + +- Breaking: Run start messages (pl72 schema) now require a `start_time` field. - Breaking: Ignore Kafka IP addresses sent in `StartJob` messages, experiment data is now fetched from the broker configured in `job-pool-uri`. -- The case when messages are not received from a specific Kafka topic does not make the file writer unsubscribe from the topic anymore. Instead a warning is provided in the file writer log. -- Updated Conan package dependencies: - - librdkakfa (1.9.2) +- Commands received without an explicit kafka-to-nexus `service_id` are no + longer skipped. The command is processed by all workers and accepted by the + worker with a matching `job_id`. +- Fix: The case when messages are not received from a specific Kafka topic does not + make the file writer unsubscribe from the topic anymore. Instead a warning is + provided in the file writer log. +- Fix: Stop commands sent immediately after a start command were not always processed. +- Fix: Redact Kafka SASL password in logs. - Adding _f144_, _al00_ and _ep01_ writer modules. For more information on the schemas mentioned, see ([schema definitions here](https://github.com/ess-dmsc/streaming-data-types)). - Adding _se00_ writer module (see [schema definitions here](https://github.com/ess-dmsc/streaming-data-types)). - Adding _ev44_ writer module (see [schema definitions here](https://github.com/ess-dmsc/streaming-data-types)). +- Ignore deprecated warnings on macOS (can be removed when https://github.com/chriskohlhoff/asio/issues/1183 is addressed. +- Updated Conan package dependencies: librdkakfa (1.9.2) ## Version 5.2.0: Kafka improvements and other fixes diff --git a/docker_launch.sh b/docker_launch.sh index 033be4960..e15ad79d3 100755 --- a/docker_launch.sh +++ b/docker_launch.sh @@ -8,7 +8,7 @@ FOUND=false for I in {1..10} do - if $KAFKA_CMD | grep -q "TEST_writer_commands" && $KAFKA_CMD | grep -q "TEST_writer_jobs"; then + if $KAFKA_CMD | grep -q "TEST_writer_commands_alternative" && $KAFKA_CMD | grep -q "TEST_writer_commands" && $KAFKA_CMD | grep -q "TEST_writer_jobs"; then FOUND=true break else diff --git a/integration-tests/conftest.py b/integration-tests/conftest.py index f90e34242..d1e6d9304 100644 --- a/integration-tests/conftest.py +++ b/integration-tests/conftest.py @@ -95,7 +95,7 @@ def delivery_callback(err, msg): n_polls = 0 while n_polls < 10 and not topic_ready: all_topics = client.list_topics().topics.keys() - if "TEST_writer_jobs" in all_topics and "TEST_writer_commands" in all_topics: + if "TEST_writer_jobs" in all_topics and "TEST_writer_commands" in all_topics and "TEST_writer_commands_alternative" in all_topics: topic_ready = True print("Topic is ready!", flush=True) break diff --git a/integration-tests/docker-compose.yml b/integration-tests/docker-compose.yml index 80be5082f..335762249 100644 --- a/integration-tests/docker-compose.yml +++ b/integration-tests/docker-compose.yml @@ -16,7 +16,7 @@ services: KAFKA_REPLICA_FETCH_MAX_BYTES: 300000000 KAFKA_BROKER_ID: 0 KAFKA_LOG_RETENTION_MS: -1 # keep data forever, required for tests involving fake "historical" data - KAFKA_CREATE_TOPICS: "TEST_epicsConnectionStatus:1:1,TEST_sampleEnv:1:1,TEST_writer_jobs:1:1,TEST_writer_commands:1:1,TEST_forwarderConfig:1:1,TEST_forwarderStatusLR:1:1,TEST_forwarderDataLR:1:1" + KAFKA_CREATE_TOPICS: "TEST_epicsConnectionStatus:1:1,TEST_sampleEnv:1:1,TEST_writer_jobs:1:1,TEST_writer_commands:1:1,TEST_writer_commands_alternative:1:1,TEST_forwarderConfig:1:1,TEST_forwarderStatusLR:1:1,TEST_forwarderDataLR:1:1" depends_on: - wait_for_zookeeper diff --git a/integration-tests/test_stop_msg_immediately_after_start_msg.py b/integration-tests/test_stop_msg_immediately_after_start_msg.py new file mode 100644 index 000000000..bb31ed209 --- /dev/null +++ b/integration-tests/test_stop_msg_immediately_after_start_msg.py @@ -0,0 +1,47 @@ +from datetime import datetime, timedelta + +from file_writer_control.WorkerJobPool import WorkerJobPool +from file_writer_control.WriteJob import WriteJob +from file_writer_control.JobHandler import JobHandler +from helpers.nexushelpers import OpenNexusFile +from helpers import full_file_path +from helpers.writer import ( + wait_command_is_done, + wait_writers_available, + wait_no_working_writers, +) + + +def test_stop_msg_immediately_after_start_msg_on_alternative_command_topic(worker_pool, kafka_address): + file_path = full_file_path(f"output_file_stop_msg_immediately_after_start_msg_on_alternative_command_topic.nxs") + wait_writers_available(worker_pool, nr_of=1, timeout=10) + now = datetime.now() + with open("commands/nexus_structure_static.json", "r") as f: + structure = f.read() + + write_job = WriteJob( + nexus_structure=structure, + file_name=file_path, + broker=kafka_address, + start_time=now, + control_topic="TEST_writer_commands_alternative", + ) + # ECDC-3333 file-writer-control does not correctly handle jobs with + # alternative command topics. For now we create a secondary WorkerJobPool + # to do so. + worker_pool_alternative = WorkerJobPool( + job_topic_url=f"{kafka_address}/TEST_writer_jobs", + command_topic_url=f"{kafka_address}/TEST_writer_commands_alternative", + max_message_size=1048576 * 500, + ) + job_handler = JobHandler(worker_finder=worker_pool) + job_handler_alternative = JobHandler(worker_finder=worker_pool_alternative, job_id=write_job.job_id) + start_handler = job_handler.start_job(write_job) + stop_handler = job_handler_alternative.set_stop_time(now) + + # wait_command_is_done(start_handler, 10) # disabled: file-writer-control does not support topic switching + wait_command_is_done(stop_handler, 15) + wait_no_working_writers(worker_pool, timeout=10) + with OpenNexusFile(file_path) as file: + assert not file.swmr_mode + assert file["entry/start_time"][()][0].decode("utf-8") == "2016-04-12T02:58:52" diff --git a/src/CommandSystem/CommandListener.cpp b/src/CommandSystem/CommandListener.cpp index 7d5b38365..531008b9f 100644 --- a/src/CommandSystem/CommandListener.cpp +++ b/src/CommandSystem/CommandListener.cpp @@ -24,6 +24,13 @@ CommandListener::CommandListener(uri::URI CommandTopicUri, KafkaSettings.Address = CommandTopicUri.HostPort; } +CommandListener::CommandListener(uri::URI CommandTopicUri, + Kafka::BrokerSettings Settings, + time_point StartTimestamp) + : KafkaAddress(CommandTopicUri.HostPort), + CommandTopic(CommandTopicUri.Topic), KafkaSettings(Settings), + StartTimestamp(StartTimestamp) {} + std::pair CommandListener::pollForCommand() { if (not KafkaAddress.empty() and not CommandTopic.empty()) { if (Consumer == nullptr) { @@ -36,8 +43,15 @@ std::pair CommandListener::pollForCommand() { } void CommandListener::setUpConsumer() { - Consumer = Kafka::createConsumer(KafkaSettings); - Consumer->addTopic(CommandTopic); + if (StartTimestamp < time_point::max()) { + // The CommandListener instance was created to process commands since a + // specific timestamp + Consumer = Kafka::createConsumer(KafkaSettings, KafkaAddress); + Consumer->assignAllPartitions(CommandTopic, StartTimestamp); + } else { + Consumer = Kafka::createConsumer(KafkaSettings, KafkaAddress); + Consumer->addTopic(CommandTopic); + } } } // namespace Command diff --git a/src/CommandSystem/CommandListener.h b/src/CommandSystem/CommandListener.h index fb46a47ab..8962c6dc2 100644 --- a/src/CommandSystem/CommandListener.h +++ b/src/CommandSystem/CommandListener.h @@ -12,6 +12,7 @@ #include "Kafka/Consumer.h" #include "MainOpt.h" #include "Msg.h" +#include "TimeUtility.h" #include "logger.h" namespace Command { @@ -21,22 +22,30 @@ using FileWriter::Msg; /// \brief Check for new commands on a command listener topic. class CommandListener { public: - /// \brief Constructor. + /// \brief Constructor without specific StartTimestamp. /// /// \param CommandTopicUri The URI/URL of the Kafka broker + topic to connect /// to for new commands. \param Settings Kafka (consumer) settings. CommandListener(uri::URI CommandTopicUri, Kafka::BrokerSettings Settings); + /// \brief Constructor with specific StartTimestamp. + /// + /// \param CommandTopicUri The URI/URL of the Kafka broker + topic to connect + /// to for new commands. \param Settings Kafka (consumer) settings. \param + /// StartTimestamp Point in time to start listening for commands. + CommandListener(uri::URI CommandTopicUri, Kafka::BrokerSettings Settings, + time_point StartTimestamp); + /// \brief Destructor. virtual ~CommandListener() = default; /// \brief Poll the Kafka topic for a new command. /// - /// \note Will timeout on its first call. Will continue to timeout as long as - /// it fails to retrieve metadata from the Kafka broker + /// \note Will timeout on its first call. Will continue to timeout as long + /// as it fails to retrieve metadata from the Kafka broker /// - /// \return Get a std::pair<> that contains the outcome of the message poll - /// and the message if one was successfully received. + /// \return Get a std::pair<> that contains the outcome of the message + /// poll and the message if one was successfully received. virtual std::pair pollForCommand(); protected: @@ -45,5 +54,6 @@ class CommandListener { Kafka::BrokerSettings KafkaSettings; void setUpConsumer(); std::unique_ptr Consumer; + time_point StartTimestamp = time_point::max(); }; } // namespace Command diff --git a/src/CommandSystem/Handler.cpp b/src/CommandSystem/Handler.cpp index 05a381f6d..dc55f59da 100644 --- a/src/CommandSystem/Handler.cpp +++ b/src/CommandSystem/Handler.cpp @@ -204,11 +204,12 @@ void Handler::handleStartCommand(FileWriter::Msg CommandMsg, {[&]() { if (not StartJob.ControlTopic.empty()) { if (IsJobPoolCommand) { - LOG_INFO(R"(Connecting to an alternative command topic: "{}")", - StartJob.ControlTopic); + LOG_INFO( + R"(Connecting to an alternative command topic "{}" with starting offset "{}")", + StartJob.ControlTopic, StartJob.StartTime); CommandSource = std::make_unique( uri::URI{CommandTopicAddress, StartJob.ControlTopic}, - KafkaSettings); + KafkaSettings, StartJob.StartTime); AltCommandResponse = std::make_unique( ServiceId, uri::URI{CommandTopicAddress, StartJob.ControlTopic}, diff --git a/src/Kafka/Consumer.cpp b/src/Kafka/Consumer.cpp index c327bb589..8e98876ec 100644 --- a/src/Kafka/Consumer.cpp +++ b/src/Kafka/Consumer.cpp @@ -8,6 +8,7 @@ // Screaming Udder! https://esss.se #include "Consumer.h" +#include "MetaDataQuery.h" #include "MetadataException.h" #include #include @@ -80,11 +81,70 @@ void Consumer::addTopic(std::string const &Topic) { LOG_ERROR(R"(Unable to add topic "{}" to list of subscribed topics.)", Topic); throw std::runtime_error(fmt::format( - R"(Unable to add topic "{}" to list of subscribed topics. RdKafka error: "{}")", + R"(Unable to add topic "{}" to list of subscribed topics. RdKafka + error: "{}")", Topic, err2str(ErrorCode))); } } +void Consumer::assignAllPartitions(std::string const &Topic, + time_point const &StartTimestamp) { + LOG_INFO("Consumer::assignAllPartitions() Topic: {} StartTimestamp: {}", + Topic, StartTimestamp); + // Obtain partitions + RdKafka::Metadata *MetadataPtr{nullptr}; + const RdKafka::TopicMetadata *TopicMetadata = + getTopicMetadata(Topic, MetadataPtr); + std::vector TopicPartitions; + for (auto const &Partition : *TopicMetadata->partitions()) { + LOG_DEBUG( + R"(Obtaining offset for topic "{}" partition "{}" for the provided timestamp "{}")", + Topic, Partition->id(), StartTimestamp); + TopicPartitions.push_back(RdKafka::TopicPartition::create( + Topic, Partition->id(), toMilliSeconds(StartTimestamp))); + } + // Obtain offsets + auto ErrorCode = KafkaConsumer->offsetsForTimes( + TopicPartitions, + toMilliSeconds(ConsumerBrokerSettings.MaxMetadataTimeout)); + if (ErrorCode != RdKafka::ERR_NO_ERROR) { + LOG_ERROR( + R"(Could not get offsets in topic "{}" for the provided timestamp "{}". RdKafka error: "{}")", + Topic, StartTimestamp, err2str(ErrorCode)); + throw std::runtime_error(fmt::format( + R"(Could not get offsets in topic "{}" for the provided timestamp "{}". RdKafka error: "{}")", + Topic, StartTimestamp, err2str(ErrorCode))); + } + // Assign partitions at offset + ErrorCode = KafkaConsumer->assign(TopicPartitions); + if (ErrorCode != RdKafka::ERR_NO_ERROR) { + LOG_ERROR(R"(Could not assign topic-partitions. RdKafka error: "{}")", + err2str(ErrorCode)); + throw std::runtime_error( + fmt::format(R"(Could not assign topic-partitions. RdKafka error: "{}")", + err2str(ErrorCode))); + } + RdKafka::TopicPartition::destroy(TopicPartitions); + delete MetadataPtr; +} + +const RdKafka::TopicMetadata * +Consumer::getTopicMetadata(const std::string &Topic, + RdKafka::Metadata *MetadataPtr) { + std::string ErrorStr; + auto TopicObj = std::unique_ptr( + RdKafka::Topic::create(KafkaConsumer.get(), Topic, nullptr, ErrorStr)); + auto ReturnCode = KafkaConsumer->metadata( + true, TopicObj.get(), &MetadataPtr, + toMilliSeconds(ConsumerBrokerSettings.MaxMetadataTimeout)); + if (ReturnCode != RdKafka::ERR_NO_ERROR) { + throw MetadataException(fmt::format( + R"(Failed to query broker for available partitions on topic "{}". Error was: {})", + Topic, RdKafka::err2str(ReturnCode))); + } + return findTopicMetadata(Topic, MetadataPtr); +} + std::pair Consumer::poll() { auto KafkaMsg = std::unique_ptr(KafkaConsumer->consume( toMilliSeconds(ConsumerBrokerSettings.PollTimeout))); diff --git a/src/Kafka/Consumer.h b/src/Kafka/Consumer.h index af2c484a8..fb1f1dc81 100644 --- a/src/Kafka/Consumer.h +++ b/src/Kafka/Consumer.h @@ -14,7 +14,7 @@ #include "KafkaEventCb.h" #include "Msg.h" #include "PollStatus.h" -#include +#include "TimeUtility.h" #include #include @@ -32,6 +32,11 @@ class ConsumerInterface { virtual void addPartitionAtOffset(std::string const &Topic, int PartitionId, int64_t Offset) = 0; virtual void addTopic(std::string const &Topic) = 0; + virtual void assignAllPartitions(std::string const &Topic, + time_point const &StartTimestamp) = 0; + virtual const RdKafka::TopicMetadata * + getTopicMetadata(const std::string &Topic, + RdKafka::Metadata *MetadataPtr) = 0; }; class Consumer : public ConsumerInterface { @@ -48,21 +53,33 @@ class Consumer : public ConsumerInterface { Consumer(Consumer const &) = delete; ~Consumer() override; - /// Set a topic partition at a specified offset to consume from. + /// Add a topic partition at a specified offset to consume from. /// - /// Replaces any existing topics + partitions that are currently being - /// consumed. + /// Previously assigned topic partitions are preserved. /// \note This is a non blocking call. void addPartitionAtOffset(std::string const &Topic, int PartitionId, int64_t Offset) override; void addTopic(std::string const &Topic) override; + /// Assign all topic's partitions using the offsets defined by the + /// provided timestamp. + /// + /// Previous partition assignments are NOT preserved. + void assignAllPartitions(std::string const &Topic, + time_point const &StartTimestamp) override; + /// \brief Polls for any new messages. /// \note Is a blocking call with a timeout that is hard coded in the broker /// settings. \return Any new messages consumed. std::pair poll() override; + /// Obtain metadata for given topic. + /// \param Topic Topic. \param MetadataPtr Pointer to store the metadata. + const RdKafka::TopicMetadata * + getTopicMetadata(const std::string &Topic, + RdKafka::Metadata *MetadataPtr) override; + private: std::unique_ptr Conf; BrokerSettings const ConsumerBrokerSettings; @@ -87,5 +104,19 @@ class StubConsumer : public ConsumerInterface { }; void addTopic(std::string const &Topic) override { UNUSED_ARG(Topic); } + + void assignAllPartitions(std::string const &Topic, + time_point const &StartTimestamp) override { + UNUSED_ARG(Topic); + UNUSED_ARG(StartTimestamp); + } + + const RdKafka::TopicMetadata * + getTopicMetadata(const std::string &Topic, + RdKafka::Metadata *MetadataPtr) override { + UNUSED_ARG(Topic); + UNUSED_ARG(MetadataPtr); + return nullptr; + } }; } // namespace Kafka diff --git a/src/Kafka/MetaDataQuery.cpp b/src/Kafka/MetaDataQuery.cpp index 4ee0a97d8..89831aa6b 100644 --- a/src/Kafka/MetaDataQuery.cpp +++ b/src/Kafka/MetaDataQuery.cpp @@ -13,6 +13,12 @@ namespace Kafka { +const RdKafka::TopicMetadata * +findTopicMetadata(const std::string &Topic, + const RdKafka::Metadata *KafkaMetadata) { + return findKafkaTopic(Topic, KafkaMetadata); +} + std::vector> getOffsetForTime(std::string const &Broker, std::string const &Topic, std::vector const &Partitions, time_point Time, diff --git a/src/Kafka/MetaDataQuery.h b/src/Kafka/MetaDataQuery.h index a9a7e54e4..9d8675272 100644 --- a/src/Kafka/MetaDataQuery.h +++ b/src/Kafka/MetaDataQuery.h @@ -18,6 +18,10 @@ namespace Kafka { +const RdKafka::TopicMetadata * +findTopicMetadata(const std::string &Topic, + const RdKafka::Metadata *KafkaMetadata); + std::vector> getOffsetForTime(std::string const &Broker, std::string const &Topic, std::vector const &Partitions, time_point Time, diff --git a/src/Kafka/MetaDataQueryImpl.h b/src/Kafka/MetaDataQueryImpl.h index 86bbb24e0..c5822f155 100644 --- a/src/Kafka/MetaDataQueryImpl.h +++ b/src/Kafka/MetaDataQueryImpl.h @@ -117,10 +117,10 @@ getPartitionsForTopicImpl(std::string const &Broker, std::string const &Topic, auto ReturnCode = Handle->metadata(true, TopicObj.get(), &MetadataPtr, TimeOutInMs); if (ReturnCode != RdKafka::ERR_NO_ERROR) { - throw MetadataException( - fmt::format("Failed to query broker for available partitions on topic " - "{}. Error code was: {}", - Topic, ReturnCode)); + throw MetadataException(fmt::format( + "Failed to query broker {} for available partitions on topic " + "{}. Error was: {}", + Broker, Topic, RdKafka::err2str(ReturnCode))); } auto TopicMetaData = findKafkaTopic(Topic, MetadataPtr); auto ReturnVector = extractPartitinIDs(TopicMetaData); diff --git a/src/tests/helpers/KafkaMocks.h b/src/tests/helpers/KafkaMocks.h index 9327bd330..3920a7367 100644 --- a/src/tests/helpers/KafkaMocks.h +++ b/src/tests/helpers/KafkaMocks.h @@ -22,6 +22,8 @@ class MockConsumer IMPLEMENT_MOCK0(poll); IMPLEMENT_MOCK3(addPartitionAtOffset); IMPLEMENT_MOCK1(addTopic); + IMPLEMENT_MOCK2(assignAllPartitions); + IMPLEMENT_MOCK2(getTopicMetadata); }; } // namespace Kafka From 4538b218d9907bf7c85fce0c001216446de52ec9 Mon Sep 17 00:00:00 2001 From: cow-bot Date: Thu, 9 Feb 2023 08:17:27 +0000 Subject: [PATCH 03/12] GO FORMAT YOURSELF (black) --- integration-tests/conftest.py | 6 +++++- .../test_stop_msg_immediately_after_start_msg.py | 16 +++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/integration-tests/conftest.py b/integration-tests/conftest.py index d1e6d9304..495f2eb12 100644 --- a/integration-tests/conftest.py +++ b/integration-tests/conftest.py @@ -95,7 +95,11 @@ def delivery_callback(err, msg): n_polls = 0 while n_polls < 10 and not topic_ready: all_topics = client.list_topics().topics.keys() - if "TEST_writer_jobs" in all_topics and "TEST_writer_commands" in all_topics and "TEST_writer_commands_alternative" in all_topics: + if ( + "TEST_writer_jobs" in all_topics + and "TEST_writer_commands" in all_topics + and "TEST_writer_commands_alternative" in all_topics + ): topic_ready = True print("Topic is ready!", flush=True) break diff --git a/integration-tests/test_stop_msg_immediately_after_start_msg.py b/integration-tests/test_stop_msg_immediately_after_start_msg.py index bb31ed209..3f64bf946 100644 --- a/integration-tests/test_stop_msg_immediately_after_start_msg.py +++ b/integration-tests/test_stop_msg_immediately_after_start_msg.py @@ -12,8 +12,12 @@ ) -def test_stop_msg_immediately_after_start_msg_on_alternative_command_topic(worker_pool, kafka_address): - file_path = full_file_path(f"output_file_stop_msg_immediately_after_start_msg_on_alternative_command_topic.nxs") +def test_stop_msg_immediately_after_start_msg_on_alternative_command_topic( + worker_pool, kafka_address +): + file_path = full_file_path( + f"output_file_stop_msg_immediately_after_start_msg_on_alternative_command_topic.nxs" + ) wait_writers_available(worker_pool, nr_of=1, timeout=10) now = datetime.now() with open("commands/nexus_structure_static.json", "r") as f: @@ -26,8 +30,8 @@ def test_stop_msg_immediately_after_start_msg_on_alternative_command_topic(worke start_time=now, control_topic="TEST_writer_commands_alternative", ) - # ECDC-3333 file-writer-control does not correctly handle jobs with - # alternative command topics. For now we create a secondary WorkerJobPool + # ECDC-3333 file-writer-control does not correctly handle jobs with + # alternative command topics. For now we create a secondary WorkerJobPool # to do so. worker_pool_alternative = WorkerJobPool( job_topic_url=f"{kafka_address}/TEST_writer_jobs", @@ -35,7 +39,9 @@ def test_stop_msg_immediately_after_start_msg_on_alternative_command_topic(worke max_message_size=1048576 * 500, ) job_handler = JobHandler(worker_finder=worker_pool) - job_handler_alternative = JobHandler(worker_finder=worker_pool_alternative, job_id=write_job.job_id) + job_handler_alternative = JobHandler( + worker_finder=worker_pool_alternative, job_id=write_job.job_id + ) start_handler = job_handler.start_job(write_job) stop_handler = job_handler_alternative.set_stop_time(now) From 8bd1c2994184cb76255a0a282485ec7ed7d5439e Mon Sep 17 00:00:00 2001 From: Daniel Cacabelos Date: Mon, 13 Feb 2023 14:46:24 +0100 Subject: [PATCH 04/12] Chore: Remove duplicated line in changes.md --- changes.md | 1 - 1 file changed, 1 deletion(-) diff --git a/changes.md b/changes.md index 4b42d7769..4a8313f98 100644 --- a/changes.md +++ b/changes.md @@ -22,7 +22,6 @@ - Adding _se00_ writer module (see [schema definitions here](https://github.com/ess-dmsc/streaming-data-types)). - Adding _ev44_ writer module (see [schema definitions here](https://github.com/ess-dmsc/streaming-data-types)). - Ignore deprecated warnings on macOS (can be removed when https://github.com/chriskohlhoff/asio/issues/1183 is addressed. -- Updated Conan package dependencies: librdkakfa (1.9.2) - Enable idempotence setting in the Kafka producer. - Updated librdkakfa Conan package version to 2.0.2 From 1e2916398455394e114f373933547ba5f189237c Mon Sep 17 00:00:00 2001 From: Daniel Cacabelos Date: Tue, 14 Feb 2023 13:04:32 +0100 Subject: [PATCH 05/12] Refact: Remove unused variable in tests --- integration-tests/helpers/writer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/helpers/writer.py b/integration-tests/helpers/writer.py index 739ab22aa..00fdbaedb 100644 --- a/integration-tests/helpers/writer.py +++ b/integration-tests/helpers/writer.py @@ -64,7 +64,6 @@ def wait_command_is_done(command_handler: CommandHandler, timeout: float): def wait_start_job(worker_pool: WorkerJobPool, write_job: WriteJob, timeout: float): job_handler = JobHandler(worker_finder=worker_pool) start_handler = job_handler.start_job(write_job) - start_time = datetime.now() wait_command_is_done(start_handler, timeout) return job_handler From d1f946907148bc99fc67080e3d5137bec055f930 Mon Sep 17 00:00:00 2001 From: Daniel Cacabelos Date: Tue, 14 Feb 2023 13:50:02 +0100 Subject: [PATCH 06/12] Tests: Use new version of file-writer-control in integration tests --- integration-tests/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/requirements.txt b/integration-tests/requirements.txt index 86c5f47af..4c2ee8d1e 100644 --- a/integration-tests/requirements.txt +++ b/integration-tests/requirements.txt @@ -6,4 +6,4 @@ h5py>=3.1.0 flatbuffers>=1.12 black==19.3b0 ess-streaming_data_types>=0.21.0 -file-writer-control>=1.2.3 +file-writer-control>=1.2.4 From b33cbafbc54a4b860f4ed37b3c879a4bc5081e25 Mon Sep 17 00:00:00 2001 From: Daniel Cacabelos Date: Tue, 14 Feb 2023 14:37:54 +0100 Subject: [PATCH 07/12] Fix: Do not require service_id on run_stop messages The field is optional in the 6s4t schema. In the context of this PR, it is necessary to fix this issue because when stopping a job immediately after start, whoever started the job will probably not have information about which filewriter has picked up the job, and may therefore send the 6s4t message with an empty service_id. --- src/CommandSystem/Handler.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/CommandSystem/Handler.cpp b/src/CommandSystem/Handler.cpp index dc55f59da..b38369a44 100644 --- a/src/CommandSystem/Handler.cpp +++ b/src/CommandSystem/Handler.cpp @@ -308,7 +308,9 @@ void Handler::handleStopCommand(FileWriter::Msg CommandMsg) { }}}); CommandSteps.push_back( - {[&]() { return ServiceId == StopCmd.ServiceID; }, + {[&]() { + return StopCmd.ServiceID.empty() || ServiceId == StopCmd.ServiceID; + }, {LogLevel::Debug, 0, false, [&]() { return fmt::format( "Rejected stop command as the service id was wrong. It " From 1a00b970db5c3a296a2ae3696bcd3fe3188be644 Mon Sep 17 00:00:00 2001 From: Daniel Cacabelos Date: Thu, 16 Feb 2023 15:41:11 +0100 Subject: [PATCH 08/12] Tests: Adapt integration tests to support empty service_id in run_stop --- integration-tests/helpers/writer.py | 2 +- .../test_filewriter_ignores_commands.py | 104 +++++++++++++++--- 2 files changed, 88 insertions(+), 18 deletions(-) diff --git a/integration-tests/helpers/writer.py b/integration-tests/helpers/writer.py index 00fdbaedb..8b1f71648 100644 --- a/integration-tests/helpers/writer.py +++ b/integration-tests/helpers/writer.py @@ -61,7 +61,7 @@ def wait_command_is_done(command_handler: CommandHandler, timeout: float): ) -def wait_start_job(worker_pool: WorkerJobPool, write_job: WriteJob, timeout: float): +def wait_start_job(worker_pool: WorkerJobPool, write_job: WriteJob, timeout: float) -> JobHandler: job_handler = JobHandler(worker_finder=worker_pool) start_handler = job_handler.start_job(write_job) wait_command_is_done(start_handler, timeout) diff --git a/integration-tests/test_filewriter_ignores_commands.py b/integration-tests/test_filewriter_ignores_commands.py index 5be6b82c5..ab07aa365 100644 --- a/integration-tests/test_filewriter_ignores_commands.py +++ b/integration-tests/test_filewriter_ignores_commands.py @@ -2,6 +2,7 @@ from datetime import datetime, timedelta from pathlib import Path from file_writer_control.CommandStatus import CommandState +from file_writer_control.JobStatus import JobState from file_writer_control.WriteJob import WriteJob from helpers import full_file_path from helpers.writer import ( @@ -13,13 +14,13 @@ ) -def test_ignores_commands_with_incorrect_id( +def test_ignores_stop_command_with_incorrect_service_id( + request, worker_pool, kafka_address, multiple_writers, - hdf_file_name="output_file_stop_id.nxs", ): - file_path = full_file_path(hdf_file_name) + file_path = full_file_path(f"{request.node.name}.nxs") wait_writers_available(worker_pool, nr_of=2, timeout=20) now = datetime.now() @@ -32,37 +33,106 @@ def test_ignores_commands_with_incorrect_id( start_time=now, stop_time=now + timedelta(days=30), ) - wait_start_job(worker_pool, write_job, timeout=20) + start_cmd_handler = wait_start_job(worker_pool, write_job, timeout=20) - cmd_handler = worker_pool.try_send_stop_now( + stop_cmd_handler = worker_pool.try_send_stop_now( "incorrect service id", write_job.job_id ) used_timeout = timedelta(seconds=5) - cmd_handler.set_timeout(used_timeout) + stop_cmd_handler.set_timeout(used_timeout) - time.sleep(used_timeout.total_seconds() + 2) + time.sleep(used_timeout.total_seconds() + 5) + assert ( + stop_cmd_handler.get_state() == CommandState.TIMEOUT_RESPONSE + ), f"Stop command not ignored. State was {stop_cmd_handler.get_state()} (cmd id: f{stop_cmd_handler.command_id})" assert ( - cmd_handler.get_state() == CommandState.TIMEOUT_RESPONSE - ), f"State was {cmd_handler.get_state()} (cmd id: f{cmd_handler.command_id})" + start_cmd_handler.get_state() in [JobState.WRITING] + ), f"Start job may have been affected by Stop command. State was {start_cmd_handler.get_state()} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}" + + stop_all_jobs(worker_pool) + wait_no_working_writers(worker_pool, timeout=15) + assert Path(file_path).is_file() + + +def test_ignores_stop_command_with_incorrect_job_id( + request, + worker_pool, + kafka_address, + multiple_writers, +): + file_path = full_file_path(f"{request.node.name}.nxs") + wait_writers_available(worker_pool, nr_of=2, timeout=20) + now = datetime.now() + + with open("commands/nexus_structure.json", "r") as f: + structure = f.read() + write_job = WriteJob( + nexus_structure=structure, + file_name=file_path, + broker=kafka_address, + start_time=now, + stop_time=now + timedelta(days=30), + ) + start_cmd_handler = wait_start_job(worker_pool, write_job, timeout=20) cmd_handler = worker_pool.try_send_stop_now(write_job.service_id, "wrong job id") + used_timeout = timedelta(seconds=5) cmd_handler.set_timeout(used_timeout) - time.sleep(used_timeout.total_seconds() + 2) + time.sleep(used_timeout.total_seconds() + 5) assert ( - cmd_handler.get_state() == CommandState.TIMEOUT_RESPONSE - ), f"State was {cmd_handler.get_state()} (cmd id: f{cmd_handler.command_id})" + start_cmd_handler.get_state() in [JobState.WRITING] + ), f"Start job may have been affected by Stop command. State was {start_cmd_handler.get_state()} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}" stop_all_jobs(worker_pool) wait_no_working_writers(worker_pool, timeout=0) assert Path(file_path).is_file() -def test_ignores_commands_with_incorrect_job_id( - worker_pool, kafka_address, hdf_file_name="output_file_job_id.nxs" +def test_accepts_stop_command_with_empty_service_id( + request, + worker_pool, + kafka_address, + multiple_writers, +): + file_path = full_file_path(f"{request.node.name}.nxs") + wait_writers_available(worker_pool, nr_of=2, timeout=20) + now = datetime.now() + + with open("commands/nexus_structure.json", "r") as f: + structure = f.read() + write_job = WriteJob( + nexus_structure=structure, + file_name=file_path, + broker=kafka_address, + start_time=now, + stop_time=now + timedelta(days=30), + ) + start_cmd_handler = wait_start_job(worker_pool, write_job, timeout=20) + + stop_cmd_handler = worker_pool.try_send_stop_now( + None, write_job.job_id + ) + + used_timeout = timedelta(seconds=5) + stop_cmd_handler.set_timeout(used_timeout) + + time.sleep(used_timeout.total_seconds() + 5) + start_job_state = start_cmd_handler.get_state() + assert ( + start_job_state in [JobState.DONE] + ), f"Start job was not stopped after Stop command. State was {start_job_state} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}" + + stop_all_jobs(worker_pool) + wait_no_working_writers(worker_pool, timeout=5) + assert Path(file_path).is_file() + + +def test_ignores_start_command_with_incorrect_job_id( + request, worker_pool, kafka_address ): - file_path = full_file_path(hdf_file_name) + file_path = full_file_path(f"{request.node.name}.nxs") wait_writers_available(worker_pool, nr_of=1, timeout=10) now = datetime.now() with open("commands/nexus_structure.json", "r") as f: @@ -82,9 +152,9 @@ def test_ignores_commands_with_incorrect_job_id( def test_reject_bad_json( - worker_pool, kafka_address, hdf_file_name="rejected_start_command.nxs" + request, worker_pool, kafka_address ): - file_path = full_file_path(hdf_file_name) + file_path = full_file_path(f"{request.node.name}.nxs") wait_writers_available(worker_pool, nr_of=1, timeout=10) now = datetime.now() start_time = now - timedelta(seconds=10) From 327663b631d659a5cf2a73bd9a87ccd8aa22e944 Mon Sep 17 00:00:00 2001 From: Daniel Cacabelos Date: Thu, 16 Feb 2023 15:53:29 +0100 Subject: [PATCH 09/12] Tests: Stop filewriters before docker containers --- integration-tests/conftest.py | 12 ++++++------ ...nores_commands.py => test_filewriter_commands.py} | 0 2 files changed, 6 insertions(+), 6 deletions(-) rename integration-tests/{test_filewriter_ignores_commands.py => test_filewriter_commands.py} (100%) diff --git a/integration-tests/conftest.py b/integration-tests/conftest.py index 495f2eb12..2f1ce14aa 100644 --- a/integration-tests/conftest.py +++ b/integration-tests/conftest.py @@ -176,18 +176,18 @@ def build_and_run( time.sleep(10) def fin(): - # Stop the containers then remove them and their volumes (--volumes option) - if not custom_kafka_broker: - print("Stopping docker containers", flush=True) - options["--timeout"] = 30 - cmd.down(options) - print("Containers stopped", flush=True) print("Stopping file-writers") for fw in list_of_writers: fw.terminate() for fw in list_of_writers: fw.wait() print("File-writers stopped") + # Stop the containers then remove them and their volumes (--volumes option) + if not custom_kafka_broker: + print("Stopping docker containers", flush=True) + options["--timeout"] = 10 + cmd.down(options) + print("Containers stopped", flush=True) # Using a finalizer rather than yield in the fixture means # that the containers will be brought down even if tests fail diff --git a/integration-tests/test_filewriter_ignores_commands.py b/integration-tests/test_filewriter_commands.py similarity index 100% rename from integration-tests/test_filewriter_ignores_commands.py rename to integration-tests/test_filewriter_commands.py From ebe50afb26a60b799461baac99da2e1405356200 Mon Sep 17 00:00:00 2001 From: Daniel Cacabelos Date: Thu, 16 Feb 2023 15:56:48 +0100 Subject: [PATCH 10/12] Tests: Fix references to renamed tests --- integration-tests/stress.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/stress.py b/integration-tests/stress.py index 92e2e7f52..6c94a7122 100644 --- a/integration-tests/stress.py +++ b/integration-tests/stress.py @@ -7,7 +7,7 @@ test_two_different_writer_modules_with_same_flatbuffer_id, ) from test_filewriter_stop_time import test_start_and_stop_time_are_in_the_past -from test_filewriter_ignores_commands import test_ignores_commands_with_incorrect_job_id +from test_filewriter_commands import test_ignores_stop_command_with_incorrect_job_id from test_f142_meta_data import test_f142_meta_data from file_writer_control import WorkerJobPool @@ -26,7 +26,7 @@ def main(): test_end_message_metadata, test_two_different_writer_modules_with_same_flatbuffer_id, test_start_and_stop_time_are_in_the_past, - test_ignores_commands_with_incorrect_job_id, + test_ignores_stop_command_with_incorrect_job_id, test_f142_meta_data, ] for func in list_of_tests: From 94ccf8565fa6112607a00c68f1cf56b98dbcf259 Mon Sep 17 00:00:00 2001 From: Daniel Cacabelos Date: Thu, 16 Feb 2023 16:11:55 +0100 Subject: [PATCH 11/12] Fix: Add null pointer test --- src/Kafka/Consumer.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Kafka/Consumer.cpp b/src/Kafka/Consumer.cpp index 8e98876ec..7a27d676d 100644 --- a/src/Kafka/Consumer.cpp +++ b/src/Kafka/Consumer.cpp @@ -95,6 +95,11 @@ void Consumer::assignAllPartitions(std::string const &Topic, RdKafka::Metadata *MetadataPtr{nullptr}; const RdKafka::TopicMetadata *TopicMetadata = getTopicMetadata(Topic, MetadataPtr); + if (TopicMetadata == nullptr) { + throw std::runtime_error(fmt::format( + R"(Could not assign partitions in topic "{}" for the provided timestamp "{}". Topic metadata not found)", + Topic, StartTimestamp)); + } std::vector TopicPartitions; for (auto const &Partition : *TopicMetadata->partitions()) { LOG_DEBUG( From bcf0ebc5e6af7e66b948772f1eaf848f03b246d6 Mon Sep 17 00:00:00 2001 From: cow-bot Date: Mon, 20 Feb 2023 07:38:53 +0000 Subject: [PATCH 12/12] GO FORMAT YOURSELF (black) --- integration-tests/helpers/writer.py | 4 ++- integration-tests/test_filewriter_commands.py | 26 ++++++++----------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/integration-tests/helpers/writer.py b/integration-tests/helpers/writer.py index 8b1f71648..eab2c6605 100644 --- a/integration-tests/helpers/writer.py +++ b/integration-tests/helpers/writer.py @@ -61,7 +61,9 @@ def wait_command_is_done(command_handler: CommandHandler, timeout: float): ) -def wait_start_job(worker_pool: WorkerJobPool, write_job: WriteJob, timeout: float) -> JobHandler: +def wait_start_job( + worker_pool: WorkerJobPool, write_job: WriteJob, timeout: float +) -> JobHandler: job_handler = JobHandler(worker_finder=worker_pool) start_handler = job_handler.start_job(write_job) wait_command_is_done(start_handler, timeout) diff --git a/integration-tests/test_filewriter_commands.py b/integration-tests/test_filewriter_commands.py index ab07aa365..78a46cb8b 100644 --- a/integration-tests/test_filewriter_commands.py +++ b/integration-tests/test_filewriter_commands.py @@ -46,9 +46,9 @@ def test_ignores_stop_command_with_incorrect_service_id( assert ( stop_cmd_handler.get_state() == CommandState.TIMEOUT_RESPONSE ), f"Stop command not ignored. State was {stop_cmd_handler.get_state()} (cmd id: f{stop_cmd_handler.command_id})" - assert ( - start_cmd_handler.get_state() in [JobState.WRITING] - ), f"Start job may have been affected by Stop command. State was {start_cmd_handler.get_state()} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}" + assert start_cmd_handler.get_state() in [ + JobState.WRITING + ], f"Start job may have been affected by Stop command. State was {start_cmd_handler.get_state()} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}" stop_all_jobs(worker_pool) wait_no_working_writers(worker_pool, timeout=15) @@ -81,9 +81,9 @@ def test_ignores_stop_command_with_incorrect_job_id( cmd_handler.set_timeout(used_timeout) time.sleep(used_timeout.total_seconds() + 5) - assert ( - start_cmd_handler.get_state() in [JobState.WRITING] - ), f"Start job may have been affected by Stop command. State was {start_cmd_handler.get_state()} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}" + assert start_cmd_handler.get_state() in [ + JobState.WRITING + ], f"Start job may have been affected by Stop command. State was {start_cmd_handler.get_state()} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}" stop_all_jobs(worker_pool) wait_no_working_writers(worker_pool, timeout=0) @@ -111,18 +111,16 @@ def test_accepts_stop_command_with_empty_service_id( ) start_cmd_handler = wait_start_job(worker_pool, write_job, timeout=20) - stop_cmd_handler = worker_pool.try_send_stop_now( - None, write_job.job_id - ) + stop_cmd_handler = worker_pool.try_send_stop_now(None, write_job.job_id) used_timeout = timedelta(seconds=5) stop_cmd_handler.set_timeout(used_timeout) time.sleep(used_timeout.total_seconds() + 5) start_job_state = start_cmd_handler.get_state() - assert ( - start_job_state in [JobState.DONE] - ), f"Start job was not stopped after Stop command. State was {start_job_state} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}" + assert start_job_state in [ + JobState.DONE + ], f"Start job was not stopped after Stop command. State was {start_job_state} (job id: {start_cmd_handler.job_id}): {start_cmd_handler.get_message()}" stop_all_jobs(worker_pool) wait_no_working_writers(worker_pool, timeout=5) @@ -151,9 +149,7 @@ def test_ignores_start_command_with_incorrect_job_id( assert not Path(file_path).is_file() -def test_reject_bad_json( - request, worker_pool, kafka_address -): +def test_reject_bad_json(request, worker_pool, kafka_address): file_path = full_file_path(f"{request.node.name}.nxs") wait_writers_available(worker_pool, nr_of=1, timeout=10) now = datetime.now()