Skip to content

Commit

Permalink
Merge pull request oceanbase#204 from oceanbase/retry_batchops_merge_…
Browse files Browse the repository at this point in the history
…master

Enhance Client Support for Partition Splitting
  • Loading branch information
maochongxin authored Oct 25, 2024
2 parents 29c367a + 64926a4 commit e5e8687
Show file tree
Hide file tree
Showing 19 changed files with 1,297 additions and 370 deletions.
270 changes: 189 additions & 81 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -122,31 +122,32 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
ObRpcResultCode resultCode = new ObRpcResultCode();
resultCode.decode(buf);
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
throw new ObTableNeedFetchAllException(errMessage);
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
} else if (needFetchPartial(resultCode.getRcode())) {
throw new ObTableRoutingWrongException(errMessage);
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
} else {
// Encountered an unexpected RoutingWrong error code,
// possibly due to the client error code version being behind the observer's version.
// Attempting a full refresh here
// and delegating to the upper-level call to determine whether to throw the exception to the user based on the retry result.
logger.warn("get unexpected error code: {}", response.getMessage());
throw new ObTableNeedFetchAllException(errMessage);
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
}
}
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
if (resultCode.getRcode() != 0
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
if (needFetchAll(resultCode.getRcode(), resultCode.getPcode())) {
throw new ObTableNeedFetchAllException(errMessage);
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
} else if (needFetchPartial(resultCode.getRcode())) {
throw new ObTableRoutingWrongException(errMessage);
throw new ObTableRoutingWrongException(errMessage, resultCode.getRcode());
} else {
ExceptionUtil.throwObTableException(conn.getObTable().getIp(), conn
.getObTable().getPort(), response.getHeader().getTraceId1(), response
Expand Down Expand Up @@ -193,6 +194,8 @@ private boolean needFetchAll(int errorCode, int pcode) {
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
}

Expand Down
399 changes: 283 additions & 116 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
import com.alipay.oceanbase.rpc.protocol.payload.Constants;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;

import static com.google.common.base.Preconditions.checkArgument;

Expand Down Expand Up @@ -53,7 +54,9 @@ public class TableEntry {
// partition location
private TableEntryKey tableEntryKey = null;
private volatile ObPartitionEntry partitionEntry = null;


public ConcurrentHashMap<Long, Lock> refreshLockMap = new ConcurrentHashMap<>();

/*
* Is valid.
*/
Expand Down Expand Up @@ -218,8 +221,6 @@ public void prepare() throws IllegalArgumentException {
checkArgument(partitionInfo != null, "partition table partition info is not ready. key"
+ tableEntryKey);
partitionInfo.prepare();
checkArgument(partitionEntry != null,
"partition table partition entry is not ready. key" + tableEntryKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,23 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class ObPartitionEntry {
private Map<Long, ObPartitionLocation> partitionLocation = new HashMap<Long, ObPartitionLocation>();

// mapping from tablet id to ls id, and the part id to tablet id mapping is in ObPartitionInfo
private Map<Long, Long> tabletLsIdMap = new HashMap<>();

// tabelt id -> (PartitionLocation, LsId)
private ConcurrentHashMap<Long, ObPartitionLocationInfo> partitionInfos = new ConcurrentHashMap<>();


public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
return partitionInfos.computeIfAbsent(tabletId, id -> new ObPartitionLocationInfo());
}

public Map<Long, ObPartitionLocation> getPartitionLocation() {
return partitionLocation;
}
Expand All @@ -39,6 +49,16 @@ public void setPartitionLocation(Map<Long, ObPartitionLocation> partitionLocatio
this.partitionLocation = partitionLocation;
}

public Map<Long, Long> getTabletLsIdMap() {
return tabletLsIdMap;
}

public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
this.tabletLsIdMap = tabletLsIdMap;
}

public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }

/*
* Get partition location with part id.
*/
Expand Down Expand Up @@ -86,14 +106,4 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
public String toString() {
return "ObPartitionEntry{" + "partitionLocation=" + partitionLocation + '}';
}

public Map<Long, Long> getTabletLsIdMap() {
return tabletLsIdMap;
}

public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
this.tabletLsIdMap = tabletLsIdMap;
}

public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*-
* #%L
* com.oceanbase:obkv-table-client
* %%
* Copyright (C) 2021 - 2024 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.location.model.partition;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;

public class ObPartitionLocationInfo {
private ObPartitionLocation partitionLocation = null;
private Long tabletLsId = OB_INVALID_ID;
private Long lastUpdateTime = 0L;
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public AtomicBoolean initialized = new AtomicBoolean(false);
public final CountDownLatch initializationLatch = new CountDownLatch(1);

public ObPartitionLocation getPartitionLocation() {
rwLock.readLock().lock();
try {
return partitionLocation;
} finally {
rwLock.readLock().unlock();
}
}

public void updateLocation(ObPartitionLocation newLocation) {
this.partitionLocation = newLocation;
this.lastUpdateTime = System.currentTimeMillis();
}

public Long getTabletLsId() {
return tabletLsId;
}

public void setTabletLsId(Long tabletLsId) {
this.tabletLsId = tabletLsId;
}

public Long getLastUpdateTime() {
rwLock.readLock().lock();
try {
return lastUpdateTime;
} finally {
rwLock.readLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public ObRangePartDesc() {
public List<ObObjType> getOrderedCompareColumnTypes() {
return orderedCompareColumnTypes;
}
private List<Long> completeWorks;

private List<Long> completeWorks;

/*
* Set ordered compare column types.
Expand Down Expand Up @@ -299,20 +300,20 @@ public int getBoundsIdx(boolean isScan, Row rowKey) {
try {
List<Object> evalParams = evalRowKeyValues(rowKey);
List<Comparable> comparableElement = super.initComparableElementByTypes(evalParams,
this.orderedCompareColumns);
this.orderedCompareColumns);
ObPartitionKey searchKey = ObPartitionKey.getInstance(orderedCompareColumns,
comparableElement);
comparableElement);

int pos = upperBound(this.bounds, new ObComparableKV<ObPartitionKey, Long>(searchKey,
(long) -1));
(long) -1));
if (pos >= this.bounds.size()) {
if (isScan) {
// if range is bigger than rangeMax while scanning
// we just scan until last range
return this.bounds.size() - 1;
}
throw new ArrayIndexOutOfBoundsException("Table has no partition for value in "
+ this.getPartExpr());
+ this.getPartExpr());
} else {
return pos;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public enum Property {
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),

// [ObTable][OTHERS]
SERVER_ENABLE_REROUTING("server.enable.rerouting", true, "开启server端的重定向回复功能"),
SERVER_ENABLE_REROUTING("server.enable.rerouting", false, "开启server端的重定向回复功能"),

/*
* other config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ public enum ResultCodes {
OB_CLUSTER_NO_MATCH(-4666), //
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
OB_ERR_ZONE_NOT_EMPTY(-4668), //
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), //
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), //
OB_LS_NOT_EXIST(-4719), //
OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST(-4723), //
OB_TABLET_NOT_EXIST(-4725), //
OB_ERR_PARSER_INIT(-5000), //
OB_ERR_PARSE_SQL(-5001), //
Expand Down
Loading

0 comments on commit e5e8687

Please sign in to comment.