Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose preferred locations config #2762

Merged
merged 13 commits into from
Oct 9, 2023
2 changes: 2 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ object TiConfigConst {
// health check timeout
val GRPC_HEALTH_CHECK_TIMEOUT = "spark.tispark.grpc.health_check_timeout_in_ms"
val GPRC_HEALTH_CHECK_PERIOD = "spark.tispark.grpc.health_check_period_in_ms"
// preferred locations
val PREFERRED_LOCATIONS = "spark.tispark.preferred_locations"
}
5 changes: 5 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/utils/TiUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ object TiUtil {
tiConf.setHealthCheckPeriod(conf
.get(TiConfigConst.GPRC_HEALTH_CHECK_PERIOD, TiConfiguration.DEFHealthCheckPeriod.toString)
.toInt)

if (conf.contains(TiConfigConst.PREFERRED_LOCATIONS)) {
tiConf.setPreferredLocations(conf.get(TiConfigConst.PREFERRED_LOCATIONS))
}

tiConf
}

Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/tispark/TiRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ abstract class TiRDD(
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {

private lazy val partitionPerSplit = tiConf.getPartitionPerSplit
private lazy val preferredLocations = tiConf.getPreferredLocations

protected def checkTimezone(): Unit = {
if (!tiConf.getLocalTimeZone.equals(Converter.getLocalTimezone)) {
Expand Down Expand Up @@ -80,6 +81,10 @@ abstract class TiRDD(
result.toArray
}

override protected def getPreferredLocations(split: Partition): Seq[String] =
split.asInstanceOf[TiPartition].tasks.head.getHost :: Nil
override protected def getPreferredLocations(split: Partition): Seq[String] = {
if (preferredLocations.equalsIgnoreCase("host")) {
return split.asInstanceOf[TiPartition].tasks.head.getHost :: Nil
}
Nil
}
}
1 change: 1 addition & 0 deletions docs/userguide_3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ spark.sql("select t1.id,t2.id from spark_catalog.default.t t1 left join tidb_cat
| `spark.tispark.load_tables` | true | (experimental) Whether load all tables when we reload catalog cache. Disable it may cause table not find in scenarios where the table changes frequently. |
| `spark.tispark.grpc.health_check_timeout_in_ms` | 2000 | The timeout of health check for TiKV and TiFlash. |
| `spark.tispark.grpc.health_check_period_in_ms` | 3000 | The period duration of health check. |
| `spark.tispark.preferred_locations` | "" | The preferred locations of TiRDD partitions in TiSpark. Only `host` is available now, which takes host as preferred locations. This configuration is for forward compatibility. |

### TLS Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public TiConfiguration setCertReloadIntervalInSeconds(String interval) {
public static final int DEFHealthCheckPeriod = 3000;
private int healthCheckTimeout = DEFHealthCheckTimeout;
private int healthCheckPeriod = DEFHealthCheckPeriod;
private String preferredLocations = "";

private static Long getTimeAsSeconds(String key) {
return Utils.timeStringAsSec(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ private SelectResponse process(RegionTask regionTask) {
}

client.addResolvedLocks(startTs, resolvedLocks);
logger.info(
String.format(
"start coprocess request to %s in region %d with timeout %s",
task.getHost(),
region.getId(),
client.getTimeout()));

Collection<RegionTask> tasks =
client.coprocess(backOffer, dagRequest, ranges, responseQueue, startTs);
if (tasks != null) {
Expand Down