Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer hang fix (Closes #1065) #1066

Closed

Conversation

swapnilgawade16
Copy link

@swapnilgawade16 swapnilgawade16 commented Sep 23, 2022

After few days of experiments found the root cause.

Closes #1065

Few details on the fix:

KafkaConsumerManager's "Consumer Expiration Thread" acquires a lock on (KafkaConsumerManager.this instance) and calls expired method of KafkaConsumerState.java. The Consumer Expiration Thread sometimes takes time to get the lock on synchronized expired because there can be a consumer consuming records that has a lock on the KafkaConsumerState instance which may take longer to release depending on the time taken in the IO to fetch the records from kafka broker. The situation can get worse with more number of active consumers fetching records.

The two locks for expired and updateExpiration are not required.

Thread dump snippets

"qtp1551629761-29" prio=5 Id=29 BLOCKED on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 owned by "Consumer Expiration Thread" Id=47
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.getConsumerInstance(KafkaConsumerManager.java:669)
 -  blocked on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.getConsumerInstance(KafkaConsumerManager.java:679)

"qtp1551629761-35" prio=5 Id=35 BLOCKED on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 owned by "Consumer Expiration Thread" Id=47
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.createConsumer(KafkaConsumerManager.java:180)
 -  blocked on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 

"Consumer Expiration Thread" daemon prio=5 Id=47 BLOCKED on io.confluent.kafkarest.v2.JsonKafkaConsumerState@4d2bbeda owned by "pool-4-thread-13" Id=153
 at app//io.confluent.kafkarest.v2.KafkaConsumerState.expired(KafkaConsumerState.java:345)
 -  blocked on io.confluent.kafkarest.v2.JsonKafkaConsumerState@4d2bbeda at app//io.confluent.kafkarest.v2.KafkaConsumerManager$ExpirationThread.run(KafkaConsumerManager.java:761)
 -  locked io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4

"pool-4-thread-13" prio=5 Id=153 RUNNABLE (in native)
 at [email protected]/sun.nio.ch.EPoll.wait(Native Method)
 at [email protected]/sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:120)
 at [email protected]/sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:124)
 -  locked sun.nio.ch.Util$2@1458190c
 -  locked sun.nio.ch.EPollSelectorImpl@3ebca1e6
 at [email protected]/sun.nio.ch.SelectorImpl.select(SelectorImpl.java:136)
 at app//org.apache.kafka.common.network.Selector.select(Selector.java:869)
 at app//org.apache.kafka.common.network.Selector.poll(Selector.java:465)
 at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
 at app//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)

After few days of experiments found the root cause. 

### Few details on the fix: 
KafkaConsumerManager's `"Consumer Expiration Thread"` acquires a lock on (`KafkaConsumerManager.this` instance) and calls `expired` method of `KafkaConsumerState.java`. The `Consumer Expiration Thread` sometimes cannot get the lock on `expired` is because if there is a consumer consuming records there is a lock on the `KafkaConsumerState instance` which may take longer to release depending on the time taken in the IO to fetch the records from kafka broker. 

From the code it seems like the double lock for `expired and updateExpiration` is not required. 

### Thread dump snippets
{code}
"qtp1551629761-29" prio=5 Id=29 BLOCKED on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 owned by "Consumer Expiration Thread" Id=47
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.getConsumerInstance(KafkaConsumerManager.java:669)
 -  blocked on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.getConsumerInstance(KafkaConsumerManager.java:679)

"qtp1551629761-35" prio=5 Id=35 BLOCKED on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 owned by "Consumer Expiration Thread" Id=47
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.createConsumer(KafkaConsumerManager.java:180)
 -  blocked on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4
 at app//com.nuance.coretech.fabric.kafka.rest.plugin.resources.ConsumerResources.createGroup(ConsumerResources.java:116)

"Consumer Expiration Thread" daemon prio=5 Id=47 BLOCKED on io.confluent.kafkarest.v2.JsonKafkaConsumerState@4d2bbeda owned by "pool-4-thread-13" Id=153
 at app//io.confluent.kafkarest.v2.KafkaConsumerState.expired(KafkaConsumerState.java:345)
 -  blocked on io.confluent.kafkarest.v2.JsonKafkaConsumerState@4d2bbeda
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager$ExpirationThread.run(KafkaConsumerManager.java:761)
 -  locked io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4

"pool-4-thread-13" prio=5 Id=153 RUNNABLE (in native)
 at [email protected]/sun.nio.ch.EPoll.wait(Native Method)
 at [email protected]/sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:120)
 at [email protected]/sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:124)
 -  locked sun.nio.ch.Util$2@1458190c
 -  locked sun.nio.ch.EPollSelectorImpl@3ebca1e6
 at [email protected]/sun.nio.ch.SelectorImpl.select(SelectorImpl.java:136)
 at app//org.apache.kafka.common.network.Selector.select(Selector.java:869)
 at app//org.apache.kafka.common.network.Selector.poll(Selector.java:465)
 at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
 at app//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
{code}
@swapnilgawade16 swapnilgawade16 requested a review from a team as a code owner September 23, 2022 04:31
@CLAassistant
Copy link

CLAassistant commented Sep 23, 2022

CLA assistant check
All committers have signed the CLA.

@swapnilgawade16 swapnilgawade16 deleted the issue#1065 branch September 23, 2022 11:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants