Skip to content

Commit

Permalink
[close #634] fix rawBatchPut hang while batch size is larger than MAX…
Browse files Browse the repository at this point in the history
…_RAW_BATCH_LIMIT (#640) (#641)

Co-authored-by: iosmanthus <[email protected]>
  • Loading branch information
ti-srebot and iosmanthus authored Jul 29, 2022
1 parent 6e8c0fc commit 9a127af
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
19 changes: 10 additions & 9 deletions src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -749,17 +749,18 @@ private void doSendBatchPut(
while (!taskQueue.isEmpty()) {
List<Batch> task = taskQueue.poll();
for (Batch batch : task) {
completionService.submit(
() -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl));
futureList.add(
completionService.submit(
() -> doSendBatchPutInBatchesWithRetry(batch.getBackOffer(), batch, ttl)));
}

try {
getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
} catch (Exception e) {
for (Future<List<Batch>> future : futureList) {
future.cancel(true);
}
throw e;
try {
getTasks(completionService, taskQueue, task, deadline - System.currentTimeMillis());
} catch (Exception e) {
for (Future<List<Batch>> future : futureList) {
future.cancel(true);
}
throw e;
}
}
}
Expand Down
18 changes: 15 additions & 3 deletions src/test/java/org/tikv/raw/RawKVClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.tikv.raw.RawKVClientBase.MAX_RAW_BATCH_LIMIT;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -360,7 +361,7 @@ public void scan0test() {
}

int i = 0;
Iterator<KvPair> iter = client.scan0(prefix, ByteString.EMPTY, cnt);
Iterator<KvPair> iter = client.scanPrefix0(prefix, cnt, false);
while (iter.hasNext()) {
i++;
KvPair pair = iter.next();
Expand All @@ -369,7 +370,7 @@ public void scan0test() {
assertEquals(cnt, i);

i = 0;
iter = client.scan0(prefix, ByteString.EMPTY, true);
iter = client.scanPrefix0(prefix, true);
while (iter.hasNext()) {
i++;
KvPair pair = iter.next();
Expand Down Expand Up @@ -397,7 +398,7 @@ public void ingestTest() {
});
client.ingest(kvs);

assertEquals(client.scan(prefix, ByteString.EMPTY).size(), cnt);
assertEquals(client.scanPrefix(prefix).size(), cnt);
}

@Test
Expand Down Expand Up @@ -1084,4 +1085,15 @@ public int compare(ByteString startKey, ByteString endKey) {
return FastByteComparisons.compareTo(startKey.toByteArray(), endKey.toByteArray());
}
}

@Test
public void testBatchPutForIssue634() {
ByteString prefix = ByteString.copyFromUtf8("testBatchPutForIssue634");
client.deletePrefix(prefix);
HashMap<ByteString, ByteString> kvs = new HashMap<>();
for (int i = 0; i < MAX_RAW_BATCH_LIMIT * 4; i++) {
kvs.put(prefix.concat(ByteString.copyFromUtf8("key@" + i)), rawValue("value@" + i));
}
client.batchPut(kvs);
}
}

0 comments on commit 9a127af

Please sign in to comment.