Skip to content

Commit

Permalink
[close #639] reduce lock granularity of RegionStoreClient and PDClient (
Browse files Browse the repository at this point in the history
#638) (#642)

Co-authored-by: iosmanthus <[email protected]>
  • Loading branch information
ti-srebot and iosmanthus authored Jul 31, 2022
1 parent 9a127af commit 6c0f6ad
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 46 deletions.
8 changes: 3 additions & 5 deletions src/main/java/org/tikv/common/AbstractGRPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public long getTimeout() {

private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
while (true) {
backOffer.checkTimeout();

try {
ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping);
HealthGrpc.HealthBlockingStub stub =
Expand All @@ -198,10 +200,6 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin
}

protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
try {
return doCheckHealth(backOffer, addressStr, hostMapping);
} catch (Exception e) {
return false;
}
return doCheckHealth(backOffer, addressStr, hostMapping);
}
}
34 changes: 28 additions & 6 deletions src/main/java/org/tikv/common/PDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand Down Expand Up @@ -122,6 +125,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
private ConcurrentMap<Long, Double> tiflashReplicaMap;
private HostMapping hostMapping;
private long lastUpdateLeaderTime;
private final ExecutorService updateLeaderService = Executors.newSingleThreadExecutor();
private final AtomicBoolean updateLeaderNotify = new AtomicBoolean();

public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY =
HistogramUtils.buildDuration()
Expand Down Expand Up @@ -426,6 +431,8 @@ public void close() throws InterruptedException {
if (channelFactory != null) {
channelFactory.close();
}

updateLeaderService.shutdownNow();
}

@VisibleForTesting
Expand Down Expand Up @@ -462,11 +469,7 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) {
}

private GetMembersResponse getMembers(BackOffer backOffer, URI uri) {
try {
return doGetMembers(backOffer, uri);
} catch (Exception e) {
return null;
}
return doGetMembers(backOffer, uri);
}

// return whether the leader has changed to target address `leaderUrlStr`.
Expand Down Expand Up @@ -518,7 +521,26 @@ synchronized boolean createFollowerClientWrapper(
return true;
}

public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
public void tryUpdateLeaderOrForwardFollower() {
if (updateLeaderNotify.compareAndSet(false, true)) {
try {
BackOffer backOffer = defaultBackOffer();
updateLeaderService.submit(
() -> {
try {
updateLeaderOrForwardFollower(backOffer);
} finally {
updateLeaderNotify.set(false);
}
});
} catch (RejectedExecutionException e) {
logger.error("PDClient is shutdown", e);
updateLeaderNotify.set(false);
}
}
}

private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
return;
}
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/org/tikv/common/operation/PDErrorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.tikv.common.PDClient;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.pd.PDError;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
Expand Down Expand Up @@ -59,7 +60,12 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
case PD_ERROR:
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
client.updateLeaderOrForwardFollower(backOffer);
SlowLogSpan tryUpdateLeaderSpan = backOffer.getSlowLog().start("try_update_leader");
try {
client.tryUpdateLeaderOrForwardFollower();
} finally {
tryUpdateLeaderSpan.end();
}
return true;
case REGION_PEER_NOT_ELECTED:
logger.debug(error.getMessage());
Expand All @@ -80,7 +86,12 @@ public boolean handleRequestError(BackOffer backOffer, Exception e) {
return false;
}
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e);
client.updateLeaderOrForwardFollower(backOffer);
SlowLogSpan updateLeaderSpan = backOffer.getSlowLog().start("try_update_leader");
try {
client.tryUpdateLeaderOrForwardFollower();
} finally {
updateLeaderSpan.end();
}
return true;
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/tikv/common/region/RegionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public Pair<TiRegion, TiStore> getRegionStorePairByKey(ByteString key, TiStoreTy
public Pair<TiRegion, TiStore> getRegionStorePairByKey(
ByteString key, TiStoreType storeType, BackOffer backOffer) {
TiRegion region = getRegionByKey(key, backOffer);
if (!region.isValid()) {
if (region == null || !region.isValid()) {
throw new TiClientInternalException("Region invalid: " + region);
}

Expand Down
20 changes: 8 additions & 12 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1408,38 +1408,34 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store
this);
}

public synchronized RegionStoreClient build(TiRegion region, TiStore store)
throws GrpcException {
public RegionStoreClient build(TiRegion region, TiStore store) throws GrpcException {
return build(region, store, TiStoreType.TiKV);
}

public synchronized RegionStoreClient build(ByteString key) throws GrpcException {
public RegionStoreClient build(ByteString key) throws GrpcException {
return build(key, TiStoreType.TiKV);
}

public synchronized RegionStoreClient build(ByteString key, BackOffer backOffer)
throws GrpcException {
public RegionStoreClient build(ByteString key, BackOffer backOffer) throws GrpcException {
return build(key, TiStoreType.TiKV, backOffer);
}

public synchronized RegionStoreClient build(ByteString key, TiStoreType storeType)
throws GrpcException {
public RegionStoreClient build(ByteString key, TiStoreType storeType) throws GrpcException {
return build(key, storeType, defaultBackOff());
}

public synchronized RegionStoreClient build(
ByteString key, TiStoreType storeType, BackOffer backOffer) throws GrpcException {
public RegionStoreClient build(ByteString key, TiStoreType storeType, BackOffer backOffer)
throws GrpcException {
Pair<TiRegion, TiStore> pair =
regionManager.getRegionStorePairByKey(key, storeType, backOffer);
return build(pair.first, pair.second, storeType);
}

public synchronized RegionStoreClient build(TiRegion region) throws GrpcException {
public RegionStoreClient build(TiRegion region) throws GrpcException {
return build(region, defaultBackOff());
}

public synchronized RegionStoreClient build(TiRegion region, BackOffer backOffer)
throws GrpcException {
public RegionStoreClient build(TiRegion region, BackOffer backOffer) throws GrpcException {
TiStore store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer);
return build(region, store, TiStoreType.TiKV);
}
Expand Down
33 changes: 15 additions & 18 deletions src/test/java/org/tikv/common/PDClientV2MockTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,16 @@ public void testGetRegionById() throws Exception {
String start = "getRegionById";
String end = "getRegionByIdEnd";
leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, end));
try (PDClient client = createClient()) {
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
Assert.assertEquals(end, r.getEndKey().toStringUtf8());
}
PDClient client = createClient();
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
Assert.assertEquals(end, r.getEndKey().toStringUtf8());

leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, ""));
try (PDClient client = createClient()) {
Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
Assert.assertEquals("", r.getEndKey().toStringUtf8());
}

r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first;
Assert.assertEquals(start, r.getStartKey().toStringUtf8());
Assert.assertEquals("", r.getEndKey().toStringUtf8());
}

@Test
Expand All @@ -113,15 +111,14 @@ public void testScanRegions() throws Exception {
.addRegions(Pdpb.Region.newBuilder().setRegion(makeRegion(start, end)).build())
.build());

try (PDClient client = createClient()) {
List<Region> regions =
client.scanRegions(
ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1);
PDClient client = createClient();
List<Region> regions =
client.scanRegions(
ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1);

for (Region r : regions) {
Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8());
Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8());
}
for (Region r : regions) {
Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8());
Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8());
}
}
}
7 changes: 5 additions & 2 deletions src/test/java/org/tikv/common/TimeoutTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ public void testTimeoutInTime() {
try (RawKVClient client = createClient()) {
pdServers.get(0).stop();
long start = System.currentTimeMillis();
client.get(ByteString.copyFromUtf8("key"));
try {
client.get(ByteString.copyFromUtf8("key"));
} catch (Exception ignore) {
}
long end = System.currentTimeMillis();
Assert.assertTrue(end - start < session.getConf().getRawKVReadTimeoutInMS() * 2L);
Assert.assertTrue(end - start < (session.getConf().getRawKVReadTimeoutInMS() * 1.5));
}
}
}

0 comments on commit 6c0f6ad

Please sign in to comment.