From 6d2d1810de972c188c1cc3524dc16d52baa984ba Mon Sep 17 00:00:00 2001 From: Usman Masood Date: Thu, 2 Jun 2016 00:18:51 -0700 Subject: [PATCH] Log PID of consumer procs --- pipeline_kafka.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pipeline_kafka.c b/pipeline_kafka.c index b814de06..fd2ee93c 100644 --- a/pipeline_kafka.c +++ b/pipeline_kafka.c @@ -315,8 +315,8 @@ kafka_consume_main_sigterm(SIGNAL_ARGS) int save_errno = errno; got_sigterm = true; - if (MyProc) - SetLatch(&MyProc->procLatch); + if (MyLatch) + SetLatch(MyLatch); errno = save_errno; } @@ -828,7 +828,7 @@ kafka_consume_main(Datum arg) MemoryContext work_ctx; if (!found) - elog(ERROR, "kafka consumer process %d not found", id); + elog(ERROR, "[pipeline_kafka consumer (%d)] consumer process entry %d not found", MyProcPid, id); pqsignal(SIGTERM, kafka_consume_main_sigterm); #define BACKTRACE_SEGFAULTS @@ -858,13 +858,13 @@ kafka_consume_main(Datum arg) * Add all brokers currently in pipeline_kafka.brokers */ if (consumer.brokers == NIL) - elog(ERROR, "no valid brokers were found"); + elog(ERROR, "[pipeline_kafka consumer (%d)] no valid brokers were found", MyProcPid); foreach(lc, consumer.brokers) valid_brokers += rd_kafka_brokers_add(kafka, lfirst(lc)); if (!valid_brokers) - elog(ERROR, "no valid brokers were found"); + elog(ERROR, "[pipeline_kafka consumer (%d)] no valid brokers were found", MyProcPid); /* * Set up our topic to read from @@ -891,8 +891,8 @@ kafka_consume_main(Datum arg) if (partition % consumer.parallelism != proc->partition_group) continue; - elog(LOG, "[kafka consumer] %s <- %s consuming partition %d from offset %ld %d", - consumer.rel->relname, consumer.topic, partition, consumer.offsets[partition], MyProcPid); + elog(LOG, "[pipeline_kafka consumer (%d)] %s <- %s consuming partition %d from offset %ld", + MyProcPid, consumer.rel->relname, consumer.topic, partition, consumer.offsets[partition]); if (rd_kafka_consume_start(topic, partition, consumer.offsets[partition]) == -1) elog(ERROR, "failed to start consuming: %s", rd_kafka_err2str(rd_kafka_errno2err(errno))); @@ -905,8 +905,8 @@ kafka_consume_main(Datum arg) */ if (my_partitions == 0) { - elog(LOG, "[kafka consumer] %s <- %s consumer %d doesn't have any partitions to read from", - consumer.rel->relname, consumer.topic, MyProcPid); + elog(LOG, "[pipeline_kafka consumer (%d)] %s <- %s consumer %d doesn't have any partitions to read from", + MyProcPid, consumer.rel->relname, consumer.topic, MyProcPid); goto done; } @@ -952,8 +952,8 @@ kafka_consume_main(Datum arg) { /* Ignore partition EOF internal error */ if (messages[i]->err != RD_KAFKA_RESP_ERR__PARTITION_EOF) - elog(LOG, "[kafka consumer] %s <- %s consumer error %s", - consumer.rel->relname, consumer.topic, rd_kafka_err2str(messages[i]->err)); + elog(LOG, "[pipeline_kafka consumer (%d)] %s <- %s consumer error %s", + MyProcPid, consumer.rel->relname, consumer.topic, rd_kafka_err2str(messages[i]->err)); } else if (messages[i]->len > 0) { @@ -976,7 +976,7 @@ kafka_consume_main(Datum arg) librdkerrs = error_buf_pop(&my_error_buf); if (librdkerrs) - elog(LOG, "[pipeline_kafka consumer]: %s", librdkerrs); + elog(LOG, "[pipeline_kafka consumer (%d)]: %s", MyProcPid, librdkerrs); if (messages_buffered == 0) { @@ -994,8 +994,8 @@ kafka_consume_main(Datum arg) } PG_CATCH(); { - elog(LOG, "[kafka consumer] %s <- %s failed to process batch, dropped %d message%s:", - consumer.rel->relname, consumer.topic, messages_buffered, (messages_buffered == 1 ? "" : "s")); + elog(LOG, "[pipeline_kafka consumer (%d)] %s <- %s failed to process batch, dropped %d message%s:", + MyProcPid, consumer.rel->relname, consumer.topic, messages_buffered, (messages_buffered == 1 ? "" : "s")); EmitErrorReport(); FlushErrorState();