Skip to content

Commit

Permalink
Log PID of consumer procs
Browse files Browse the repository at this point in the history
  • Loading branch information
usmanm committed Jun 2, 2016
1 parent c278767 commit 6d2d181
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions pipeline_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ kafka_consume_main_sigterm(SIGNAL_ARGS)
int save_errno = errno;

got_sigterm = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
if (MyLatch)
SetLatch(MyLatch);

errno = save_errno;
}
Expand Down Expand Up @@ -828,7 +828,7 @@ kafka_consume_main(Datum arg)
MemoryContext work_ctx;

if (!found)
elog(ERROR, "kafka consumer process %d not found", id);
elog(ERROR, "[pipeline_kafka consumer (%d)] consumer process entry %d not found", MyProcPid, id);

pqsignal(SIGTERM, kafka_consume_main_sigterm);
#define BACKTRACE_SEGFAULTS
Expand Down Expand Up @@ -858,13 +858,13 @@ kafka_consume_main(Datum arg)
* Add all brokers currently in pipeline_kafka.brokers
*/
if (consumer.brokers == NIL)
elog(ERROR, "no valid brokers were found");
elog(ERROR, "[pipeline_kafka consumer (%d)] no valid brokers were found", MyProcPid);

foreach(lc, consumer.brokers)
valid_brokers += rd_kafka_brokers_add(kafka, lfirst(lc));

if (!valid_brokers)
elog(ERROR, "no valid brokers were found");
elog(ERROR, "[pipeline_kafka consumer (%d)] no valid brokers were found", MyProcPid);

/*
* Set up our topic to read from
Expand All @@ -891,8 +891,8 @@ kafka_consume_main(Datum arg)
if (partition % consumer.parallelism != proc->partition_group)
continue;

elog(LOG, "[kafka consumer] %s <- %s consuming partition %d from offset %ld %d",
consumer.rel->relname, consumer.topic, partition, consumer.offsets[partition], MyProcPid);
elog(LOG, "[pipeline_kafka consumer (%d)] %s <- %s consuming partition %d from offset %ld",
MyProcPid, consumer.rel->relname, consumer.topic, partition, consumer.offsets[partition]);

if (rd_kafka_consume_start(topic, partition, consumer.offsets[partition]) == -1)
elog(ERROR, "failed to start consuming: %s", rd_kafka_err2str(rd_kafka_errno2err(errno)));
Expand All @@ -905,8 +905,8 @@ kafka_consume_main(Datum arg)
*/
if (my_partitions == 0)
{
elog(LOG, "[kafka consumer] %s <- %s consumer %d doesn't have any partitions to read from",
consumer.rel->relname, consumer.topic, MyProcPid);
elog(LOG, "[pipeline_kafka consumer (%d)] %s <- %s consumer %d doesn't have any partitions to read from",
MyProcPid, consumer.rel->relname, consumer.topic, MyProcPid);
goto done;
}

Expand Down Expand Up @@ -952,8 +952,8 @@ kafka_consume_main(Datum arg)
{
/* Ignore partition EOF internal error */
if (messages[i]->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
elog(LOG, "[kafka consumer] %s <- %s consumer error %s",
consumer.rel->relname, consumer.topic, rd_kafka_err2str(messages[i]->err));
elog(LOG, "[pipeline_kafka consumer (%d)] %s <- %s consumer error %s",
MyProcPid, consumer.rel->relname, consumer.topic, rd_kafka_err2str(messages[i]->err));
}
else if (messages[i]->len > 0)
{
Expand All @@ -976,7 +976,7 @@ kafka_consume_main(Datum arg)

librdkerrs = error_buf_pop(&my_error_buf);
if (librdkerrs)
elog(LOG, "[pipeline_kafka consumer]: %s", librdkerrs);
elog(LOG, "[pipeline_kafka consumer (%d)]: %s", MyProcPid, librdkerrs);

if (messages_buffered == 0)
{
Expand All @@ -994,8 +994,8 @@ kafka_consume_main(Datum arg)
}
PG_CATCH();
{
elog(LOG, "[kafka consumer] %s <- %s failed to process batch, dropped %d message%s:",
consumer.rel->relname, consumer.topic, messages_buffered, (messages_buffered == 1 ? "" : "s"));
elog(LOG, "[pipeline_kafka consumer (%d)] %s <- %s failed to process batch, dropped %d message%s:",
MyProcPid, consumer.rel->relname, consumer.topic, messages_buffered, (messages_buffered == 1 ? "" : "s"));
EmitErrorReport();
FlushErrorState();

Expand Down

0 comments on commit 6d2d181

Please sign in to comment.