diff --git a/src/main/java/org/tikv/common/AbstractGRPCClient.java b/src/main/java/org/tikv/common/AbstractGRPCClient.java index d2675f94385..ac68552f7e2 100644 --- a/src/main/java/org/tikv/common/AbstractGRPCClient.java +++ b/src/main/java/org/tikv/common/AbstractGRPCClient.java @@ -182,6 +182,8 @@ public long getTimeout() { private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { while (true) { + backOffer.checkTimeout(); + try { ManagedChannel channel = channelFactory.getChannel(addressStr, hostMapping); HealthGrpc.HealthBlockingStub stub = @@ -198,10 +200,6 @@ private boolean doCheckHealth(BackOffer backOffer, String addressStr, HostMappin } protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) { - try { - return doCheckHealth(backOffer, addressStr, hostMapping); - } catch (Exception e) { - return false; - } + return doCheckHealth(backOffer, addressStr, hostMapping); } } diff --git a/src/main/java/org/tikv/common/PDClient.java b/src/main/java/org/tikv/common/PDClient.java index 695a1565b1c..1568a78e0ef 100644 --- a/src/main/java/org/tikv/common/PDClient.java +++ b/src/main/java/org/tikv/common/PDClient.java @@ -50,9 +50,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.http.client.methods.CloseableHttpResponse; @@ -122,6 +125,8 @@ public class PDClient extends AbstractGRPCClient private ConcurrentMap tiflashReplicaMap; private HostMapping hostMapping; private long lastUpdateLeaderTime; + private final ExecutorService updateLeaderService = Executors.newSingleThreadExecutor(); + private final AtomicBoolean updateLeaderNotify = new AtomicBoolean(); public static final Histogram PD_GET_REGION_BY_KEY_REQUEST_LATENCY = HistogramUtils.buildDuration() @@ -426,6 +431,8 @@ public void close() throws InterruptedException { if (channelFactory != null) { channelFactory.close(); } + + updateLeaderService.shutdownNow(); } @VisibleForTesting @@ -462,11 +469,7 @@ private GetMembersResponse doGetMembers(BackOffer backOffer, URI uri) { } private GetMembersResponse getMembers(BackOffer backOffer, URI uri) { - try { - return doGetMembers(backOffer, uri); - } catch (Exception e) { - return null; - } + return doGetMembers(backOffer, uri); } // return whether the leader has changed to target address `leaderUrlStr`. @@ -518,7 +521,26 @@ synchronized boolean createFollowerClientWrapper( return true; } - public synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { + public void tryUpdateLeaderOrForwardFollower() { + if (updateLeaderNotify.compareAndSet(false, true)) { + try { + BackOffer backOffer = defaultBackOffer(); + updateLeaderService.submit( + () -> { + try { + updateLeaderOrForwardFollower(backOffer); + } finally { + updateLeaderNotify.set(false); + } + }); + } catch (RejectedExecutionException e) { + logger.error("PDClient is shutdown", e); + updateLeaderNotify.set(false); + } + } + } + + private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) { if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) { return; } diff --git a/src/main/java/org/tikv/common/operation/PDErrorHandler.java b/src/main/java/org/tikv/common/operation/PDErrorHandler.java index 4f9cc7fbabb..76c6a768644 100644 --- a/src/main/java/org/tikv/common/operation/PDErrorHandler.java +++ b/src/main/java/org/tikv/common/operation/PDErrorHandler.java @@ -26,6 +26,7 @@ import org.tikv.common.PDClient; import org.tikv.common.exception.GrpcException; import org.tikv.common.exception.TiClientInternalException; +import org.tikv.common.log.SlowLogSpan; import org.tikv.common.pd.PDError; import org.tikv.common.util.BackOffFunction; import org.tikv.common.util.BackOffer; @@ -59,7 +60,12 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { case PD_ERROR: backOffer.doBackOff( BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString())); - client.updateLeaderOrForwardFollower(backOffer); + SlowLogSpan tryUpdateLeaderSpan = backOffer.getSlowLog().start("try_update_leader"); + try { + client.tryUpdateLeaderOrForwardFollower(); + } finally { + tryUpdateLeaderSpan.end(); + } return true; case REGION_PEER_NOT_ELECTED: logger.debug(error.getMessage()); @@ -80,7 +86,12 @@ public boolean handleRequestError(BackOffer backOffer, Exception e) { return false; } backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoPDRPC, e); - client.updateLeaderOrForwardFollower(backOffer); + SlowLogSpan updateLeaderSpan = backOffer.getSlowLog().start("try_update_leader"); + try { + client.tryUpdateLeaderOrForwardFollower(); + } finally { + updateLeaderSpan.end(); + } return true; } } diff --git a/src/main/java/org/tikv/common/region/RegionManager.java b/src/main/java/org/tikv/common/region/RegionManager.java index 8c1624e42e0..44c81375107 100644 --- a/src/main/java/org/tikv/common/region/RegionManager.java +++ b/src/main/java/org/tikv/common/region/RegionManager.java @@ -182,7 +182,7 @@ public Pair getRegionStorePairByKey(ByteString key, TiStoreTy public Pair getRegionStorePairByKey( ByteString key, TiStoreType storeType, BackOffer backOffer) { TiRegion region = getRegionByKey(key, backOffer); - if (!region.isValid()) { + if (region == null || !region.isValid()) { throw new TiClientInternalException("Region invalid: " + region); } diff --git a/src/main/java/org/tikv/common/region/RegionStoreClient.java b/src/main/java/org/tikv/common/region/RegionStoreClient.java index ccc820891f1..ba742c872b0 100644 --- a/src/main/java/org/tikv/common/region/RegionStoreClient.java +++ b/src/main/java/org/tikv/common/region/RegionStoreClient.java @@ -1408,38 +1408,34 @@ public RegionStoreClient build(TiRegion region, TiStore store, TiStoreType store this); } - public synchronized RegionStoreClient build(TiRegion region, TiStore store) - throws GrpcException { + public RegionStoreClient build(TiRegion region, TiStore store) throws GrpcException { return build(region, store, TiStoreType.TiKV); } - public synchronized RegionStoreClient build(ByteString key) throws GrpcException { + public RegionStoreClient build(ByteString key) throws GrpcException { return build(key, TiStoreType.TiKV); } - public synchronized RegionStoreClient build(ByteString key, BackOffer backOffer) - throws GrpcException { + public RegionStoreClient build(ByteString key, BackOffer backOffer) throws GrpcException { return build(key, TiStoreType.TiKV, backOffer); } - public synchronized RegionStoreClient build(ByteString key, TiStoreType storeType) - throws GrpcException { + public RegionStoreClient build(ByteString key, TiStoreType storeType) throws GrpcException { return build(key, storeType, defaultBackOff()); } - public synchronized RegionStoreClient build( - ByteString key, TiStoreType storeType, BackOffer backOffer) throws GrpcException { + public RegionStoreClient build(ByteString key, TiStoreType storeType, BackOffer backOffer) + throws GrpcException { Pair pair = regionManager.getRegionStorePairByKey(key, storeType, backOffer); return build(pair.first, pair.second, storeType); } - public synchronized RegionStoreClient build(TiRegion region) throws GrpcException { + public RegionStoreClient build(TiRegion region) throws GrpcException { return build(region, defaultBackOff()); } - public synchronized RegionStoreClient build(TiRegion region, BackOffer backOffer) - throws GrpcException { + public RegionStoreClient build(TiRegion region, BackOffer backOffer) throws GrpcException { TiStore store = regionManager.getStoreById(region.getLeader().getStoreId(), backOffer); return build(region, store, TiStoreType.TiKV); } diff --git a/src/test/java/org/tikv/common/PDClientV2MockTest.java b/src/test/java/org/tikv/common/PDClientV2MockTest.java index 305cd101c38..af9884b415b 100644 --- a/src/test/java/org/tikv/common/PDClientV2MockTest.java +++ b/src/test/java/org/tikv/common/PDClientV2MockTest.java @@ -88,18 +88,16 @@ public void testGetRegionById() throws Exception { String start = "getRegionById"; String end = "getRegionByIdEnd"; leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, end)); - try (PDClient client = createClient()) { - Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first; - Assert.assertEquals(start, r.getStartKey().toStringUtf8()); - Assert.assertEquals(end, r.getEndKey().toStringUtf8()); - } + PDClient client = createClient(); + Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first; + Assert.assertEquals(start, r.getStartKey().toStringUtf8()); + Assert.assertEquals(end, r.getEndKey().toStringUtf8()); leader.addGetRegionByIDListener(request -> makeGetRegionResponse(start, "")); - try (PDClient client = createClient()) { - Metapb.Region r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first; - Assert.assertEquals(start, r.getStartKey().toStringUtf8()); - Assert.assertEquals("", r.getEndKey().toStringUtf8()); - } + + r = client.getRegionByID(ConcreteBackOffer.newRawKVBackOff(), 1).first; + Assert.assertEquals(start, r.getStartKey().toStringUtf8()); + Assert.assertEquals("", r.getEndKey().toStringUtf8()); } @Test @@ -113,15 +111,14 @@ public void testScanRegions() throws Exception { .addRegions(Pdpb.Region.newBuilder().setRegion(makeRegion(start, end)).build()) .build()); - try (PDClient client = createClient()) { - List regions = - client.scanRegions( - ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1); + PDClient client = createClient(); + List regions = + client.scanRegions( + ConcreteBackOffer.newRawKVBackOff(), ByteString.EMPTY, ByteString.EMPTY, 1); - for (Region r : regions) { - Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8()); - Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8()); - } + for (Region r : regions) { + Assert.assertEquals(start, r.getRegion().getStartKey().toStringUtf8()); + Assert.assertEquals(end, r.getRegion().getEndKey().toStringUtf8()); } } } diff --git a/src/test/java/org/tikv/common/TimeoutTest.java b/src/test/java/org/tikv/common/TimeoutTest.java index e0f3d01d1ce..33642c2ccba 100644 --- a/src/test/java/org/tikv/common/TimeoutTest.java +++ b/src/test/java/org/tikv/common/TimeoutTest.java @@ -52,9 +52,12 @@ public void testTimeoutInTime() { try (RawKVClient client = createClient()) { pdServers.get(0).stop(); long start = System.currentTimeMillis(); - client.get(ByteString.copyFromUtf8("key")); + try { + client.get(ByteString.copyFromUtf8("key")); + } catch (Exception ignore) { + } long end = System.currentTimeMillis(); - Assert.assertTrue(end - start < session.getConf().getRawKVReadTimeoutInMS() * 2L); + Assert.assertTrue(end - start < (session.getConf().getRawKVReadTimeoutInMS() * 1.5)); } } }