Skip to content

Commit

Permalink
Avoid deadlock consuming from blocking Iterator (#2204)
Browse files Browse the repository at this point in the history
Motivation:
The blocking streaming APIs allow for specifying a Iterator
as input which is consumed by ServiceTalk. ServiceTalk assumes
it can consume in batches from the same thread in order to make
progress. However if the Iterator `next()` method is dependent
on external input we may deadlock:
- `PublisherAsBlockingIterable.onSubscribe(..)` does a `request(16)`
- `FromIterablePublisher.request(..)` loops calling `hasNext()` and
  `next()` until demand is exhausted. Each loop iteration calls
  `subscriber.onNext(next())`.
- The same thread looping in `FromIterablePublisher.request(..)` is
  also responsible for draining the queue in `PublisherAsBlockingIterable`.
  This means that no data will be sent downstream until
  `FromIterablePublisher` exits the loop. This maybe perceived as
  “dead lock” because no data is sent even if the `Iterable` provides an
  element.

Modifications:
- `FromIterablePublisher` and `FromBlockingIterablePublisher` check to
   unwrap from `PublisherAsBlockingIterable` if possible to directly
   access the underlying `Publisher`. This skips thee consumption
   via thee `Iterable` API and avoids the dead lock loop.

Result:
No more dead lock when using HTTP / gRPC API conversions and providing
`Iterable` that is dependent upon external events to make progress.
  • Loading branch information
Scottmitch authored May 6, 2022
1 parent 291f7aa commit 9aec0e0
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,21 @@ final class FromBlockingIterablePublisher<T> extends AbstractSynchronousPublishe
private final LongSupplier timeoutSupplier;
private final TimeUnit unit;

FromBlockingIterablePublisher(final BlockingIterable<? extends T> iterable,
final LongSupplier timeoutSupplier,
final TimeUnit unit) {
private FromBlockingIterablePublisher(
final BlockingIterable<? extends T> iterable, final LongSupplier timeoutSupplier, final TimeUnit unit) {
this.iterable = requireNonNull(iterable);
this.timeoutSupplier = requireNonNull(timeoutSupplier);
this.unit = requireNonNull(unit);
}

@SuppressWarnings("unchecked")
static <T> Publisher<T> fromBlockingIterable0(
BlockingIterable<? extends T> iterable, LongSupplier timeoutSupplier, TimeUnit unit) {
// Unwrap and grab the Publisher directly if possible to avoid conversion layers.
return iterable instanceof PublisherAsBlockingIterable ? ((PublisherAsBlockingIterable<T>) iterable).original :
new FromBlockingIterablePublisher<>(iterable, timeoutSupplier, unit);
}

@Override
void doSubscribe(final Subscriber<? super T> subscriber) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@ final class FromIterablePublisher<T> extends AbstractSynchronousPublisher<T> {

private final Iterable<? extends T> iterable;

FromIterablePublisher(Iterable<? extends T> iterable) {
private FromIterablePublisher(Iterable<? extends T> iterable) {
this.iterable = requireNonNull(iterable);
}

@SuppressWarnings("unchecked")
static <T> Publisher<T> fromIterable0(Iterable<? extends T> iterable) {
// Unwrap and grab the Publisher directly if possible to avoid conversion layers.
return iterable instanceof PublisherAsBlockingIterable ? ((PublisherAsBlockingIterable<T>) iterable).original :
new FromIterablePublisher<>(iterable);
}

@Override
void doSubscribe(final Subscriber<? super T> subscriber) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3827,7 +3827,7 @@ public static <T> Publisher<T> from(T... values) {
* and emit all values to the {@link Subscriber} and then {@link Subscriber#onComplete()}.
*/
public static <T> Publisher<T> fromIterable(Iterable<? extends T> iterable) {
return new FromIterablePublisher<>(iterable);
return FromIterablePublisher.fromIterable0(iterable);
}

/**
Expand Down Expand Up @@ -3855,7 +3855,7 @@ public static <T> Publisher<T> fromIterable(Iterable<? extends T> iterable) {
public static <T> Publisher<T> fromBlockingIterable(BlockingIterable<? extends T> iterable,
LongSupplier timeoutSupplier,
TimeUnit unit) {
return new FromBlockingIterablePublisher<>(iterable, timeoutSupplier, unit);
return FromBlockingIterablePublisher.fromBlockingIterable0(iterable, timeoutSupplier, unit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* @param <T> Type of items emitted by the {@link Publisher} from which this {@link BlockingIterable} is created.
*/
final class PublisherAsBlockingIterable<T> implements BlockingIterable<T> {
private final Publisher<T> original;
final Publisher<T> original;
private final int queueCapacityHint;

PublisherAsBlockingIterable(final Publisher<T> original) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.http.api.BlockingStreamingHttpClient;
import io.servicetalk.http.api.BlockingStreamingHttpResponse;
import io.servicetalk.http.api.HttpPayloadWriter;
import io.servicetalk.transport.api.ServerContext;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;

import static io.servicetalk.http.api.HttpSerializers.appSerializerAsciiFixLen;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

final class BlockingStreamingHttpClientTest {
@ParameterizedTest
@ValueSource(ints = {1, 100})
void iterableSequentialConsumptionDoesNotDeadLock(int numItems) throws Exception {
try (ServerContext ctx = HttpServers.forAddress(localAddress(0))
.listenBlockingStreamingAndAwait((ctx1, request, response) -> {
HttpPayloadWriter<String> output = response.sendMetaData(appSerializerAsciiFixLen());
for (String input : request.payloadBody(appSerializerAsciiFixLen())) {
output.write(input);
}
output.close();
});
BlockingStreamingHttpClient client = HttpClients.forResolvedAddress(serverHostAndPort(ctx))
.buildBlockingStreaming()) {
NextSuppliers<String> nextSuppliers = NextSuppliers.stringSuppliers(numItems);
// Allow first item in Iterator to return from next().
nextSuppliers.countDownNextLatch();
BlockingStreamingHttpResponse resp = client.request(client.get("/")
.payloadBody(new TestBlockingIterable<>(nextSuppliers, nextSuppliers), appSerializerAsciiFixLen()));
StringBuilder responseBody = new StringBuilder(numItems);
int i = 0;
for (String respChunk : resp.payloadBody(appSerializerAsciiFixLen())) {
// Goal is to ensure we can write each individual chunk independently without blocking threads or having
// to batch multiple items. As each chunk is echoed back, unblock the next one.
nextSuppliers.countDownNextLatch();
responseBody.append(respChunk);
++i;
}
assertThat("num items: " + i + " responseBody: " + responseBody, i, equalTo(numItems));
}
}

private static final class NextSuppliers<T> implements Supplier<T>, BooleanSupplier {
private final List<NextSupplier<T>> items;
private final AtomicInteger nextIndex = new AtomicInteger();
private final AtomicInteger latchIndex = new AtomicInteger();

NextSuppliers(List<NextSupplier<T>> items) {
this.items = items;
}

static NextSuppliers<String> stringSuppliers(final int numItems) {
List<NextSupplier<String>> items = new ArrayList<>(numItems);
for (int i = 0; i < numItems; ++i) {
items.add(new NextSupplier<>(String.valueOf(i)));
}
return new NextSuppliers<>(items);
}

void countDownNextLatch() {
final int i = latchIndex.getAndIncrement();
if (i < items.size()) {
items.get(i).latch.countDown();
}
}

@Override
public boolean getAsBoolean() {
return nextIndex.get() < items.size();
}

@Override
public T get() {
return items.get(nextIndex.getAndIncrement()).get();
}
}

private static final class NextSupplier<T> implements Supplier<T> {
final CountDownLatch latch = new CountDownLatch(1);
final T next;

private NextSupplier(final T next) {
this.next = next;
}

@Override
public T get() {
try {
latch.await();
} catch (InterruptedException e) {
throwException(e);
}
return next;
}
}

private static final class TestBlockingIterable<T> implements Iterable<T> {
private final BooleanSupplier hasNextSupplier;
private final Supplier<T> nextSupplier;

private TestBlockingIterable(final BooleanSupplier hasNextSupplier, final Supplier<T> nextSupplier) {
this.hasNextSupplier = hasNextSupplier;
this.nextSupplier = nextSupplier;
}

@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
@Override
public boolean hasNext() {
return hasNextSupplier.getAsBoolean();
}

@Override
public T next() {
return nextSupplier.get();
}
};
}
}
}

0 comments on commit 9aec0e0

Please sign in to comment.