Skip to content

Commit

Permalink
Add Python logging with same format as Java (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Sep 6, 2023
1 parent 796b3d6 commit f1a1a22
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@
# limitations under the License.
#

import logging
import sys

import yaml

from . import runtime

if __name__ == "__main__":
logging.addLevelName(logging.WARNING, "WARN")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d [%(threadName)s] %(levelname)-5s %(name).36s -- %(message)s", # noqa: E501
datefmt="%H:%M:%S",
)

print(sys.argv)
if len(sys.argv) != 2:
print("Missing pod configuration file argument")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
)
from .topic_connector import TopicConsumer, TopicProducer

LOG = logging.getLogger(__name__)
STRING_DESERIALIZER = StringDeserializer()

SERIALIZERS = {
Expand Down Expand Up @@ -102,7 +103,7 @@ def create_dlq_producer(agent_id, streaming_cluster, configuration):
dlq_conf = configuration.get("deadLetterTopicProducer")
if not dlq_conf:
return None
logging.info(
LOG.info(
f"Creating dead-letter topic producer for agent {agent_id} using configuration "
f"{configuration}"
)
Expand Down Expand Up @@ -155,16 +156,16 @@ def __init__(self, configs):
self.commit_ever_called = False

def start(self):
self.consumer = Consumer(self.configs)
logging.info(f"Subscribing consumer to {self.topic}")
self.consumer = Consumer(self.configs, logger=LOG)
LOG.info(f"Subscribing consumer to {self.topic}")
self.consumer.subscribe(
[self.topic], on_assign=self.on_assign, on_revoke=self.on_revoke
)

def close(self):
with self.lock:
if self.consumer:
logging.info(
LOG.info(
f"Closing consumer to {self.topic} with {self.pending_commits} "
f"pending commits and {len(self.uncommitted)} uncommitted "
f"offsets: {self.uncommitted} "
Expand All @@ -188,9 +189,9 @@ def read(self) -> List[KafkaRecord]:
if message is None:
return []
if message.error():
logging.error(f"Consumer error: {message.error()}")
LOG.error(f"Consumer error: {message.error()}")
return []
logging.debug(
LOG.debug(
f"Received message from Kafka topics {self.consumer.assignment()}:"
f" {message}"
)
Expand Down Expand Up @@ -223,7 +224,7 @@ def commit(self, records: List[KafkaRecord]):
if committed_tp_offset.error:
raise KafkaException(committed_tp_offset.error)
current_offset = committed_tp_offset.offset
logging.info(
LOG.info(
f"Current position on partition {topic_partition} is "
f"{current_offset}"
)
Expand Down Expand Up @@ -269,34 +270,34 @@ def on_commit(self, error, partitions):
with self.lock:
self.pending_commits -= 1
if error:
logging.error(f"Error committing offsets on topic {self.topic}: {error}")
LOG.error(f"Error committing offsets on topic {self.topic}: {error}")
if not self.commit_failure:
self.commit_failure = KafkaException(error)
else:
logging.debug(f"Offsets committed: {partitions}")
LOG.debug(f"Offsets committed: {partitions}")

def on_assign(self, consumer: Consumer, partitions: List[TopicPartition]):
with self.lock:
logging.info(f"Partitions assigned: {partitions}")
LOG.info(f"Partitions assigned: {partitions}")
for partition in partitions:
offset = consumer.committed([partition])[0].offset
logging.info(f"Last committed offset for {partition} is {offset}")
LOG.info(f"Last committed offset for {partition} is {offset}")
if offset >= 0:
self.committed[partition] = offset

def on_revoke(self, _, partitions: List[TopicPartition]):
with self.lock:
logging.info(f"Partitions revoked: {partitions}")
LOG.info(f"Partitions revoked: {partitions}")
for partition in partitions:
if partition in self.committed:
offset = self.committed.pop(partition)
logging.info(
LOG.info(
f"Current offset {offset} on partition {partition} (revoked)"
)
if partition in self.uncommitted:
offsets = self.uncommitted.pop(partition)
if len(offsets) > 0:
logging.warning(
LOG.warning(
f"There are uncommitted offsets {offsets} on partition "
f"{partition} (revoked), these messages will be "
f"re-delivered"
Expand Down Expand Up @@ -330,13 +331,13 @@ def __init__(self, configs):
self.delivery_failure: Optional[Exception] = None

def start(self):
self.producer = Producer(self.configs)
self.producer = Producer(self.configs, logger=LOG)

def write(self, records: List[Record]):
for record in records:
if self.delivery_failure:
raise self.delivery_failure
logging.info(f"Sending record {record}")
LOG.info(f"Sending record {record}")
headers = []
if record.headers():
for key, value in record.headers():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
TopicConsumerWithDLQSource,
)

LOG = logging.getLogger(__name__)


def current_time_millis():
return int(time.time_ns() / 1000_000)
Expand All @@ -64,7 +66,7 @@ def __init__(self, configuration):

def handle_errors(self, source_record: Record, error) -> ErrorsProcessingOutcome:
self.failures += 1
logging.info(
LOG.info(
f"Handling error {error} for source record {source_record}, "
f"errors count {self.failures} (max retries {self.retries})"
)
Expand Down Expand Up @@ -264,7 +266,7 @@ def run_with_server(


def run(configuration, agent=None, agent_info: AgentInfo = AgentInfo(), max_loops=-1):
logging.info(f"Pod Configuration {configuration}")
LOG.info(f"Pod Configuration {configuration}")

if "streamingCluster" not in configuration:
raise ValueError("streamingCluster cannot be null")
Expand Down Expand Up @@ -405,7 +407,7 @@ def run_main_loop(
source,
)
except Exception as e:
logging.exception("Error while processing records")
LOG.exception("Error while processing records")
# raise the error
# this way the consumer will not commit the records
raise e
Expand All @@ -426,7 +428,7 @@ def run_processor_agent(
trial_number = 0
while len(records_to_process) > 0:
trial_number += 1
logging.info(
LOG.info(
f"run processor on {len(records_to_process)} records "
f"(trial #{trial_number})"
)
Expand All @@ -439,26 +441,26 @@ def run_processor_agent(
if isinstance(processor_result, Exception):
action = errors_handler.handle_errors(source_record, processor_result)
if action == ErrorsProcessingOutcome.SKIP:
logging.error(
LOG.error(
f"Unrecoverable error {processor_result} while processing the "
f"records, skipping"
)
results_by_record[source_record] = (source_record, processor_result)
elif action == ErrorsProcessingOutcome.RETRY:
logging.error(
LOG.error(
f"Retryable error {processor_result} while processing the "
f"records, retrying"
)
records_to_process.append(source_record)
elif action == ErrorsProcessingOutcome.FAIL:
logging.error(
LOG.error(
f"Unrecoverable error {processor_result} while processing some "
f"records, failing"
)
# TODO: replace with custom exception ?
source.permanent_failure(source_record, processor_result)
if errors_handler.fail_processing_on_permanent_errors():
logging.error("Failing processing on permanent error")
LOG.error("Failing processing on permanent error")
raise processor_result
# in case the source does not throw an exception we mark the record
# as "skipped"
Expand Down Expand Up @@ -496,26 +498,26 @@ def write_records_to_the_sink(
action = errors_handler.handle_errors(source_record, error)
if action == ErrorsProcessingOutcome.SKIP:
# skip (the whole batch)
logging.error(
LOG.error(
f"Unrecoverable error {error} while processing the records, "
f"skipping"
)
source_record_tracker.commit(for_the_sink)
return
elif action == ErrorsProcessingOutcome.RETRY:
# retry (the whole batch)
logging.error(
LOG.error(
f"Retryable error {error} while processing the records, retrying"
)
elif action == ErrorsProcessingOutcome.FAIL:
logging.error(
LOG.error(
f"Unrecoverable error {error} while processing some records, "
f"failing"
)
# TODO: replace with custom exception ?
source.permanent_failure(source_record, error)
if errors_handler.fail_processing_on_permanent_errors():
logging.error("Failing processing on permanent error")
LOG.error("Failing processing on permanent error")
raise error
# in case the source does not throw an exception we mark the record as
# "skipped"
Expand All @@ -526,7 +528,7 @@ def write_records_to_the_sink(

class NoopTopicConsumer(TopicConsumer):
def read(self):
logging.info("Sleeping for 1 second, no records...")
LOG.info("Sleeping for 1 second, no records...")
time.sleep(1)
return []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def test_simple_agent(_):
agent = runtime.init_agent(config)
agent_info = runtime.AgentInfo()
runtime.run(config, agent=agent, agent_info=agent_info, max_loops=2)
print(json.dumps(agent_info.worker_status(), indent=2))
assert (
json.dumps(agent_info.worker_status(), indent=2)
== """[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
TopicConsumer,
)

LOG = logging.getLogger(__name__)


class TopicConsumerSource(Source):
def __init__(self, consumer: TopicConsumer):
Expand All @@ -38,11 +40,11 @@ def commit(self, records: List[Record]):
self.consumer.commit(records)

def start(self):
logging.info(f"Starting consumer {self.consumer}")
LOG.info(f"Starting consumer {self.consumer}")
self.consumer.start()

def close(self):
logging.info(f"Closing consumer {self.consumer}")
LOG.info(f"Closing consumer {self.consumer}")
self.consumer.close()

def agent_info(self) -> Dict[str, Any]:
Expand All @@ -66,7 +68,7 @@ def close(self):
self.dlq_producer.close()

def permanent_failure(self, record: Record, error: Exception):
logging.error(f"Sending record to DLQ: {record}")
LOG.error(f"Sending record to DLQ: {record}")
self.dlq_producer.write([record])


Expand All @@ -76,11 +78,11 @@ def __init__(self, producer: TopicProducer):
self.commit_callback = None

def start(self):
logging.info(f"Starting producer {self.producer}")
LOG.info(f"Starting producer {self.producer}")
self.producer.start()

def close(self):
logging.info(f"Closing producer {self.producer}")
LOG.info(f"Closing producer {self.producer}")
self.producer.close()

def write(self, records: List[Record]):
Expand Down

0 comments on commit f1a1a22

Please sign in to comment.