Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6302] refactor:Optimize Flink connector properties converter #6303

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class IcebergPropertiesUtils {
// will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will
// change it to `catalogType` automatically and pass it to Iceberg.
public static final Map<String, String> GRAVITINO_CONFIG_TO_ICEBERG;
public static final Map<String, String> ICEBERG_CATALOG_CONFIG_TO_GRAVITINO;

static {
Map<String, String> map = new HashMap();
Expand Down Expand Up @@ -65,6 +66,14 @@ public class IcebergPropertiesUtils {
AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
IcebergConstants.ICEBERG_ADLS_STORAGE_ACCOUNT_KEY);
GRAVITINO_CONFIG_TO_ICEBERG = Collections.unmodifiableMap(map);

Map<String, String> icebergCatalogConfigToGravitino = new HashMap<>();
map.forEach(
(key, value) -> {
icebergCatalogConfigToGravitino.put(value, key);
});
ICEBERG_CATALOG_CONFIG_TO_GRAVITINO =
Collections.unmodifiableMap(icebergCatalogConfigToGravitino);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.apache.gravitino.flink.connector;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;

/**
* PropertiesConverter is used to convert properties between Flink properties and Apache Gravitino
Expand All @@ -38,7 +40,18 @@ public interface PropertiesConverter {
* @return properties for the Gravitino catalog.
*/
default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
return flinkConf.toMap();
Map<String, String> gravitinoProperties = Maps.newHashMap();
for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
String gravitinoKey = transformPropertiesToGravitinoCatalog(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue());
} else {
gravitinoProperties.put(entry.getKey(), entry.getValue());
}
}
return gravitinoProperties;
}

/**
Expand All @@ -48,7 +61,24 @@ default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf
* @return properties for the Flink connector.
*/
default Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
Map<String, String> all = Maps.newHashMap();
gravitinoProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
}
all.put(flinkConfigKey, value);
});
Map<String, String> allProperties = transformPropertiesToFlinkCatalog(all);
allProperties.put(CommonCatalogOptions.CATALOG_TYPE.key(), getFlinkCatalogType());
return allProperties;
}

String transformPropertiesToGravitinoCatalog(String configKey);

default Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
return allProperties;
}

/**
Expand Down Expand Up @@ -90,4 +120,6 @@ default Map<String, String> toFlinkTableProperties(Map<String, String> gravitino
default Map<String, String> toGravitinoTableProperties(Map<String, String> flinkProperties) {
return flinkProperties;
}

String getFlinkCatalogType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.hive.HiveConstants;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -34,46 +32,24 @@ public class HivePropertiesConverter implements PropertiesConverter {
private HivePropertiesConverter() {}

public static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter();

private static final Map<String, String> HIVE_CATALOG_CONFIG_TO_GRAVITINO =
ImmutableMap.of(HiveConf.ConfVars.METASTOREURIS.varname, HiveConstants.METASTORE_URIS);
private static final Map<String, String> GRAVITINO_CONFIG_TO_HIVE =
ImmutableMap.of(HiveConstants.METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname);

@Override
public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoProperties = Maps.newHashMap();

for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
String gravitinoKey = HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue());
} else {
gravitinoProperties.put(entry.getKey(), entry.getValue());
}
}

return gravitinoProperties;
public String transformPropertiesToGravitinoCatalog(String configKey) {
return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> flinkCatalogProperties = Maps.newHashMap();
flinkCatalogProperties.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoHiveCatalogFactoryOptions.IDENTIFIER);

gravitinoProperties.forEach(
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
Map<String, String> all = Maps.newHashMap();
allProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
}
flinkCatalogProperties.put(
GRAVITINO_CONFIG_TO_HIVE.getOrDefault(flinkConfigKey, flinkConfigKey), value);
all.put(GRAVITINO_CONFIG_TO_HIVE.getOrDefault(key, key), value);
});
return flinkCatalogProperties;
return all;
}

@Override
Expand All @@ -95,4 +71,9 @@ public Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoP
properties.put("connector", "hive");
return properties;
}

@Override
public String getFlinkCatalogType() {
return GravitinoHiveCatalogFactoryOptions.IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

package org.apache.gravitino.flink.connector.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.flink.connector.PropertiesConverter;
Expand All @@ -38,36 +37,23 @@ private IcebergPropertiesConverter() {}
IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE);

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Preconditions.checkArgument(
gravitinoProperties != null, "Iceberg Catalog properties should not be null.");

Map<String, String> all = new HashMap<>();
if (gravitinoProperties != null) {
gravitinoProperties.forEach(
(k, v) -> {
if (k.startsWith(FLINK_PROPERTY_PREFIX)) {
String newKey = k.substring(FLINK_PROPERTY_PREFIX.length());
all.put(newKey, v);
}
});
}
Map<String, String> transformedProperties =
IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties);
public String transformPropertiesToGravitinoCatalog(String configKey) {
return IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

if (transformedProperties != null) {
all.putAll(transformedProperties);
}
all.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
// Map "catalog-backend" to "catalog-type".
// TODO If catalog backend is CUSTOM, we need special compatibility logic.
GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach(
@Override
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
Map<String, String> all = Maps.newHashMap();
allProperties.forEach(
(key, value) -> {
if (all.containsKey(key)) {
String config = all.remove(key);
all.put(value, config);
String icebergConfigKey = key;
if (IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) {
icebergConfigKey = IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(key);
}
if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(key)) {
icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(key);
}
all.put(icebergConfigKey, value);
});
return all;
}
Expand All @@ -78,7 +64,7 @@ public Map<String, String> toGravitinoTableProperties(Map<String, String> proper
}

@Override
public Map<String, String> toFlinkTableProperties(Map<String, String> properties) {
return new HashMap<>(properties);
public String getFlinkCatalogType() {
return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

package org.apache.gravitino.flink.connector.paimon;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
import org.apache.gravitino.flink.connector.PropertiesConverter;
Expand All @@ -37,44 +34,38 @@ private PaimonPropertiesConverter() {}

@Override
public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoProperties = Maps.newHashMap();
Map<String, String> gravitinoProperties =
PropertiesConverter.super.toGravitinoCatalogProperties(flinkConf);
Map<String, String> flinkConfMap = flinkConf.toMap();
for (Map.Entry<String, String> entry : flinkConfMap.entrySet()) {
String gravitinoKey =
PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue());
} else {
gravitinoProperties.put(entry.getKey(), entry.getValue());
}
}
gravitinoProperties.put(
PaimonConstants.CATALOG_BACKEND,
flinkConfMap.getOrDefault(PaimonConstants.METASTORE, FileSystemCatalogFactory.IDENTIFIER));
return gravitinoProperties;
}

@Override
public String transformPropertiesToGravitinoCatalog(String configKey) {
return PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> all = new HashMap<>();
gravitinoProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
}
all.put(flinkConfigKey, value);
});
Map<String, String> paimonCatalogProperties =
PaimonPropertiesUtils.toPaimonCatalogProperties(all);
PropertiesConverter.super.toFlinkCatalogProperties(gravitinoProperties);
paimonCatalogProperties.put(
PaimonConstants.METASTORE,
paimonCatalogProperties.getOrDefault(
PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER));
paimonCatalogProperties.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER);
return paimonCatalogProperties;
}

@Override
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
return PaimonPropertiesUtils.toPaimonCatalogProperties(allProperties);
}

@Override
public String getFlinkCatalogType() {
return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void testCatalogPropertiesWithHiveBackend() {
"hive-uri",
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
"hive-warehouse",
"key1",
"flink.bypass.key1",
"value1"));
Assertions.assertEquals(
ImmutableMap.of(
Expand All @@ -50,7 +50,9 @@ void testCatalogPropertiesWithHiveBackend() {
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
"hive-uri",
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
"hive-warehouse"),
"hive-warehouse",
"key1",
"value1"),
properties);
}

Expand All @@ -65,7 +67,7 @@ void testCatalogPropertiesWithRestBackend() {
"rest-uri",
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
"rest-warehouse",
"key1",
"flink.bypass.key1",
"value1"));
Assertions.assertEquals(
ImmutableMap.of(
Expand All @@ -76,7 +78,9 @@ void testCatalogPropertiesWithRestBackend() {
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
"rest-uri",
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
"rest-warehouse"),
"rest-warehouse",
"key1",
"value1"),
properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testCreateGravitinoIcebergUsingSQL() {

Assertions.assertEquals(
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
properties.get(CommonCatalogOptions.CATALOG_TYPE.key()));
properties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key())));

// Get the created catalog.
Optional<org.apache.flink.table.catalog.Catalog> catalog = tableEnv.getCatalog(catalogName);
Expand Down
Loading