diff --git a/LICENSE.bin b/LICENSE.bin index 738687a6a2..9ab5edbd6f 100644 --- a/LICENSE.bin +++ b/LICENSE.bin @@ -285,6 +285,7 @@ Apache Hadoop Aliyun connector Apache Hadoop GCS connector Apache Hadoop AWS connector + Apache Hadoop Azure connector Apache Hadoop Annotatations Apache Hadoop Auth Apache Hadoop Client Aggregator diff --git a/build.gradle.kts b/build.gradle.kts index e6c49df406..01f1043e99 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -766,7 +766,7 @@ tasks { !it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark") && !it.name.startsWith("iceberg") && it.name != "trino-connector" && it.name != "integration-test" && it.name != "bundled-catalog" && !it.name.startsWith("flink") && it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink") && - it.name != "gcp-bundle" && it.name != "aliyun-bundle" && it.name != "aws-bundle" + it.name != "gcp-bundle" && it.name != "aliyun-bundle" && it.name != "aws-bundle" && it.name != "azure-bundle" ) { from(it.configurations.runtimeClasspath) into("distribution/package/libs") @@ -788,7 +788,7 @@ tasks { !it.name.startsWith("trino-connector") && it.name != "bundled-catalog" && it.name != "hive-metastore-common" && it.name != "gcp-bundle" && - it.name != "aliyun-bundle" && it.name != "aws-bundle" + it.name != "aliyun-bundle" && it.name != "aws-bundle" && it.name != "azure-bundle" ) { dependsOn("${it.name}:build") from("${it.name}/build/libs") diff --git a/bundles/azure-bundle/build.gradle.kts b/bundles/azure-bundle/build.gradle.kts new file mode 100644 index 0000000000..fa6a68d1af --- /dev/null +++ b/bundles/azure-bundle/build.gradle.kts @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + +plugins { + `maven-publish` + id("java") + alias(libs.plugins.shadow) +} + +dependencies { + compileOnly(project(":api")) + compileOnly(project(":core")) + compileOnly(project(":catalogs:catalog-hadoop")) + + compileOnly(libs.hadoop3.common) + + implementation(libs.commons.lang3) + // runtime used + implementation(libs.commons.logging) + implementation(libs.hadoop3.abs) + implementation(project(":catalogs:catalog-common")) { + exclude("*") + } +} + +tasks.withType(ShadowJar::class.java) { + isZip64 = true + configurations = listOf(project.configurations.runtimeClasspath.get()) + archiveClassifier.set("") + + // Relocate dependencies to avoid conflicts + relocate("org.apache.httpcomponents", "org.apache.gravitino.azure.shaded.org.apache.httpcomponents") + relocate("org.apache.commons", "org.apache.gravitino.azure.shaded.org.apache.commons") + relocate("com.fasterxml", "org.apache.gravitino.azure.shaded.com.fasterxml") + relocate("com.google.guava", "org.apache.gravitino.azure.shaded.com.google.guava") +} + +tasks.jar { + dependsOn(tasks.named("shadowJar")) + archiveClassifier.set("empty") +} + +tasks.compileJava { + dependsOn(":catalogs:catalog-hadoop:runtimeJars") +} diff --git a/bundles/azure-bundle/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java b/bundles/azure-bundle/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java new file mode 100644 index 0000000000..d3f8d8db3a --- /dev/null +++ b/bundles/azure-bundle/src/main/java/org/apache/gravitino/abs/fs/AzureFileSystemProvider.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.abs.fs; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.Map; +import javax.annotation.Nonnull; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.storage.ABSProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class AzureFileSystemProvider implements FileSystemProvider { + + @VisibleForTesting public static final String ABS_PROVIDER_SCHEME = "abfss"; + + @VisibleForTesting public static final String ABS_PROVIDER_NAME = "abfs"; + + private static final String ABFS_IMPL = "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem"; + + private static final String ABFS_IMPL_KEY = "fs.abfss.impl"; + + @Override + public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map config) + throws IOException { + Configuration configuration = new Configuration(); + + Map hadoopConfMap = + FileSystemUtils.toHadoopConfigMap(config, ImmutableMap.of()); + + if (config.containsKey(ABSProperties.GRAVITINO_ABS_ACCOUNT_NAME) + && config.containsKey(ABSProperties.GRAVITINO_ABS_ACCOUNT_KEY)) { + hadoopConfMap.put( + String.format( + "fs.azure.account.key.%s.dfs.core.windows.net", + config.get(ABSProperties.GRAVITINO_ABS_ACCOUNT_NAME)), + config.get(ABSProperties.GRAVITINO_ABS_ACCOUNT_KEY)); + } + + if (!config.containsKey(ABFS_IMPL_KEY)) { + configuration.set(ABFS_IMPL_KEY, ABFS_IMPL); + } + + hadoopConfMap.forEach(configuration::set); + + return FileSystem.get(path.toUri(), configuration); + } + + @Override + public String scheme() { + return ABS_PROVIDER_SCHEME; + } + + @Override + public String name() { + return ABS_PROVIDER_NAME; + } +} diff --git a/bundles/azure-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider b/bundles/azure-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider new file mode 100644 index 0000000000..ab864341cc --- /dev/null +++ b/bundles/azure-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.gravitino.abs.fs.AzureFileSystemProvider \ No newline at end of file diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/ABSProperties.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/ABSProperties.java new file mode 100644 index 0000000000..a76ece32ba --- /dev/null +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/ABSProperties.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage; + +public class ABSProperties { + + // The account name of the Azure Blob Storage. + public static final String GRAVITINO_ABS_ACCOUNT_NAME = "abs-account-name"; + + // The account key of the Azure Blob Storage. + public static final String GRAVITINO_ABS_ACCOUNT_KEY = "abs-account-key"; +} diff --git a/catalogs/catalog-hadoop/build.gradle.kts b/catalogs/catalog-hadoop/build.gradle.kts index c925d1b92d..409a87fb10 100644 --- a/catalogs/catalog-hadoop/build.gradle.kts +++ b/catalogs/catalog-hadoop/build.gradle.kts @@ -80,6 +80,7 @@ dependencies { testImplementation(project(":bundles:aws-bundle")) testImplementation(project(":bundles:gcp-bundle")) testImplementation(project(":bundles:aliyun-bundle")) + testImplementation(project(":bundles:azure-bundle")) testImplementation(libs.minikdc) testImplementation(libs.hadoop3.minicluster) diff --git a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopABSCatalogIT.java b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopABSCatalogIT.java new file mode 100644 index 0000000000..b05baa7acf --- /dev/null +++ b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopABSCatalogIT.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.hadoop.integration.test; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Schema; +import org.apache.gravitino.abs.fs.AzureFileSystemProvider; +import org.apache.gravitino.file.Fileset; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.storage.ABSProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; +import org.junit.platform.commons.util.StringUtils; + +@EnabledIf("absEnabled") +public class HadoopABSCatalogIT extends HadoopCatalogIT { + + public static final String ABS_ACCOUNT_NAME = System.getenv("ADLS_ACCOUNT_NAME"); + public static final String ABS_ACCOUNT_KEY = System.getenv("ADLS_ACCOUNT_KEY"); + public static final String ABS_CONTAINER_NAME = System.getenv("ADLS_CONTAINER_NAME"); + + @Override + public void startIntegrationTest() throws Exception { + // Just overwrite super, do nothing. + } + + @BeforeAll + public void setup() throws IOException { + copyBundleJarsToHadoop("azure-bundle"); + + try { + super.startIntegrationTest(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake"); + catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog"); + schemaName = GravitinoITUtils.genRandomName("CatalogFilesetIT_schema"); + + schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX); + Configuration conf = new Configuration(); + + conf.set( + String.format("fs.azure.account.key.%s.dfs.core.windows.net", ABS_ACCOUNT_NAME), + ABS_ACCOUNT_KEY); + + fileSystem = + FileSystem.get( + URI.create( + String.format( + "abfs://%s@%s.dfs.core.windows.net", ABS_CONTAINER_NAME, ABS_ACCOUNT_NAME)), + conf); + + createMetalake(); + createCatalog(); + createSchema(); + } + + protected String defaultBaseLocation() { + if (defaultBaseLocation == null) { + try { + Path bucket = + new Path( + String.format( + "%s://%s@%s.dfs.core.windows.net/%s", + AzureFileSystemProvider.ABS_PROVIDER_SCHEME, + ABS_CONTAINER_NAME, + ABS_ACCOUNT_NAME, + GravitinoITUtils.genRandomName("CatalogFilesetIT"))); + + if (!fileSystem.exists(bucket)) { + fileSystem.mkdirs(bucket); + } + + defaultBaseLocation = bucket.toString(); + } catch (IOException e) { + throw new RuntimeException("Failed to create default base location", e); + } + } + + return defaultBaseLocation; + } + + protected void createCatalog() { + Map map = Maps.newHashMap(); + map.put(ABSProperties.GRAVITINO_ABS_ACCOUNT_NAME, ABS_ACCOUNT_NAME); + map.put(ABSProperties.GRAVITINO_ABS_ACCOUNT_KEY, ABS_ACCOUNT_KEY); + map.put(FILESYSTEM_PROVIDERS, AzureFileSystemProvider.ABS_PROVIDER_NAME); + metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider, "comment", map); + + catalog = metalake.loadCatalog(catalogName); + } + + protected String generateLocation(String filesetName) { + return String.format("%s/%s", defaultBaseLocation, filesetName); + } + + @Test + public void testCreateSchemaAndFilesetWithSpecialLocation() { + String localCatalogName = GravitinoITUtils.genRandomName("local_catalog"); + + String ossLocation = + String.format( + "%s://%s@%s.dfs.core.windows.net/%s", + AzureFileSystemProvider.ABS_PROVIDER_SCHEME, + ABS_CONTAINER_NAME, + ABS_ACCOUNT_NAME, + GravitinoITUtils.genRandomName("CatalogCatalogIT")); + Map catalogProps = Maps.newHashMap(); + catalogProps.put("location", ossLocation); + catalogProps.put(ABSProperties.GRAVITINO_ABS_ACCOUNT_NAME, ABS_ACCOUNT_NAME); + catalogProps.put(ABSProperties.GRAVITINO_ABS_ACCOUNT_KEY, ABS_ACCOUNT_KEY); + catalogProps.put(FILESYSTEM_PROVIDERS, AzureFileSystemProvider.ABS_PROVIDER_NAME); + + Catalog localCatalog = + metalake.createCatalog( + localCatalogName, Catalog.Type.FILESET, provider, "comment", catalogProps); + Assertions.assertEquals(ossLocation, localCatalog.properties().get("location")); + + // Create schema without specifying location. + Schema localSchema = + localCatalog + .asSchemas() + .createSchema("local_schema", "comment", ImmutableMap.of("key1", "val1")); + + Fileset localFileset = + localCatalog + .asFilesetCatalog() + .createFileset( + NameIdentifier.of(localSchema.name(), "local_fileset"), + "fileset comment", + Fileset.Type.MANAGED, + null, + ImmutableMap.of("k1", "v1")); + Assertions.assertEquals( + ossLocation + "/local_schema/local_fileset", localFileset.storageLocation()); + + // Delete schema + localCatalog.asSchemas().dropSchema(localSchema.name(), true); + + // Create schema with specifying location. + Map schemaProps = ImmutableMap.of("location", ossLocation); + Schema localSchema2 = + localCatalog.asSchemas().createSchema("local_schema2", "comment", schemaProps); + Assertions.assertEquals(ossLocation, localSchema2.properties().get("location")); + + Fileset localFileset2 = + localCatalog + .asFilesetCatalog() + .createFileset( + NameIdentifier.of(localSchema2.name(), "local_fileset2"), + "fileset comment", + Fileset.Type.MANAGED, + null, + ImmutableMap.of("k1", "v1")); + Assertions.assertEquals(ossLocation + "/local_fileset2", localFileset2.storageLocation()); + + // Delete schema + localCatalog.asSchemas().dropSchema(localSchema2.name(), true); + + // Delete catalog + metalake.dropCatalog(localCatalogName, true); + } + + private static boolean absEnabled() { + return StringUtils.isNotBlank(System.getenv("ADLS_ACCOUNT_NAME")) + && StringUtils.isNotBlank(System.getenv("ADLS_ACCOUNT_KEY")) + && StringUtils.isNotBlank(System.getenv("ADLS_CONTAINER_NAME")); + } +} diff --git a/clients/client-python/gravitino/filesystem/gvfs.py b/clients/client-python/gravitino/filesystem/gvfs.py index 8176c325a8..d90be65c3d 100644 --- a/clients/client-python/gravitino/filesystem/gvfs.py +++ b/clients/client-python/gravitino/filesystem/gvfs.py @@ -50,6 +50,7 @@ class StorageType(Enum): GCS = "gs" S3A = "s3a" OSS = "oss" + ABS = "abfss" class FilesetContextPair: @@ -320,6 +321,7 @@ def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): StorageType.GCS, StorageType.S3A, StorageType.OSS, + StorageType.ABS, ]: src_context_pair.filesystem().mv( self._strip_storage_protocol(storage_type, src_actual_path), @@ -577,6 +579,30 @@ def _convert_actual_path( ) actual_prefix = ops["host"] + ops["path"] + elif storage_location.startswith(f"{StorageType.ABS.value}://"): + ops = infer_storage_options(storage_location) + if "username" not in ops or "host" not in ops or "path" not in ops: + raise GravitinoRuntimeException( + f"Storage location:{storage_location} doesn't support now. as the username," + f"host and path are required in the storage location." + ) + actual_prefix = f"{StorageType.ABS.value}://{ops['username']}@{ops['host']}{ops['path']}" + + # For ABS, the actual path should be the same as the virtual path is like + # 'wasbs//bucket1@xiaoyu123.blob.core.windows.net/test_gvfs_catalog6588/test_gvfs_schema/ + # test_gvfs_fileset/test_cat/test.file' + # we need to add ':' after the wasbs + if actual_path.startswith(f"{StorageType.ABS.value}//"): + actual_path = actual_path.replace( + f"{StorageType.ABS.value}//", f"{StorageType.ABS.value}://" + ) + + # the actual path may be '{container}/{path}', we need to add the host and username + # get the path from {container}/{path} + if not actual_path.startswith(f"{StorageType.ABS}"): + path_without_username = actual_path[actual_path.index("/") + 1 :] + actual_path = f"{StorageType.ABS.value}://{ops['username']}@{ops['host']}/{path_without_username}" + elif storage_location.startswith(f"{StorageType.LOCAL.value}:/"): actual_prefix = storage_location[len(f"{StorageType.LOCAL.value}:") :] else: @@ -613,33 +639,22 @@ def _convert_actual_info( entry["name"], storage_location, virtual_location ) - # if entry contains 'mtime', then return the entry with 'mtime' else - # if entry contains 'LastModified', then return the entry with 'LastModified' - + last_modified = None if "mtime" in entry: # HDFS and GCS - return { - "name": path, - "size": entry["size"], - "type": entry["type"], - "mtime": entry["mtime"], - } - - if "LastModified" in entry: + last_modified = entry["mtime"] + elif "LastModified" in entry: # S3 and OSS - return { - "name": path, - "size": entry["size"], - "type": entry["type"], - "mtime": entry["LastModified"], - } - - # Unknown + last_modified = entry["LastModified"] + elif "last_modified" in entry: + # Azure Blob Storage + last_modified = entry["last_modified"] + return { "name": path, "size": entry["size"], "type": entry["type"], - "mtime": None, + "mtime": last_modified, } def _get_fileset_context(self, virtual_path: str, operation: FilesetDataOperation): @@ -745,6 +760,8 @@ def _recognize_storage_type(path: str): return StorageType.S3A if path.startswith(f"{StorageType.OSS.value}://"): return StorageType.OSS + if path.startswith(f"{StorageType.ABS.value}://"): + return StorageType.ABS raise GravitinoRuntimeException( f"Storage type doesn't support now. Path:{path}" ) @@ -801,6 +818,12 @@ def _strip_storage_protocol(storage_type: StorageType, path: str): if storage_type == StorageType.LOCAL: return path[len(f"{StorageType.LOCAL.value}:") :] + ## We need to remove the protocol and host from the path for instance + # 'wsabs://container@account/path' to 'container/path' + if storage_type == StorageType.ABS: + ops = infer_storage_options(path) + return ops["username"] + ops["path"] + # OSS has different behavior than S3 and GCS, if we do not remove the # protocol, it will always return an empty array. if storage_type == StorageType.OSS: @@ -883,6 +906,8 @@ def _get_filesystem(self, actual_file_location: str): fs = self._get_s3_filesystem() elif storage_type == StorageType.OSS: fs = self._get_oss_filesystem() + elif storage_type == StorageType.ABS: + fs = self._get_abs_filesystem() else: raise GravitinoRuntimeException( f"Storage type: `{storage_type}` doesn't support now." @@ -965,5 +990,29 @@ def _get_oss_filesystem(self): endpoint=oss_endpoint_url, ) + def _get_abs_filesystem(self): + # get 'abs_account_name' from abs options, if the key is not found, throw an exception + abs_account_name = self._options.get( + GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME + ) + if abs_account_name is None: + raise GravitinoRuntimeException( + "ABS account name is not found in the options." + ) + + # get 'abs_account_key' from abs options, if the key is not found, throw an exception + abs_account_key = self._options.get( + GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY + ) + if abs_account_key is None: + raise GravitinoRuntimeException( + "ABS account key is not found in the options." + ) + + return importlib.import_module("adlfs").AzureBlobFileSystem( + account_name=abs_account_name, + account_key=abs_account_key, + ) + fsspec.register_implementation(PROTOCOL_NAME, GravitinoVirtualFileSystem) diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py b/clients/client-python/gravitino/filesystem/gvfs_config.py index c8e22e95f7..b948233bcf 100644 --- a/clients/client-python/gravitino/filesystem/gvfs_config.py +++ b/clients/client-python/gravitino/filesystem/gvfs_config.py @@ -41,3 +41,6 @@ class GVFSConfig: GVFS_FILESYSTEM_OSS_ACCESS_KEY = "oss_access_key_id" GVFS_FILESYSTEM_OSS_SECRET_KEY = "oss_secret_access_key" GVFS_FILESYSTEM_OSS_ENDPOINT = "oss_endpoint" + + GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME = "azure_storage_account_name" + GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY = "azure_storage_account_key" diff --git a/clients/client-python/requirements.txt b/clients/client-python/requirements.txt index 8eebd57277..217687ea82 100644 --- a/clients/client-python/requirements.txt +++ b/clients/client-python/requirements.txt @@ -25,4 +25,5 @@ pyarrow==15.0.2 cachetools==5.3.3 gcsfs==2024.3.1 s3fs==2024.3.1 -ossfs==2023.12.0 \ No newline at end of file +ossfs==2023.12.0 +adlfs==2023.12.0 \ No newline at end of file diff --git a/clients/client-python/tests/integration/test_gvfs_with_abs.py b/clients/client-python/tests/integration/test_gvfs_with_abs.py new file mode 100644 index 0000000000..f9f3879b5e --- /dev/null +++ b/clients/client-python/tests/integration/test_gvfs_with_abs.py @@ -0,0 +1,373 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import os +from random import randint +import unittest + + +from adlfs import AzureBlobFileSystem + +from tests.integration.test_gvfs_with_hdfs import TestGvfsWithHDFS +from gravitino import ( + gvfs, + GravitinoClient, + Catalog, + Fileset, +) +from gravitino.exceptions.base import GravitinoRuntimeException +from gravitino.filesystem.gvfs_config import GVFSConfig + + +logger = logging.getLogger(__name__) + + +def azure_abs_is_prepared(): + return ( + os.environ.get("ABS_ACCOUNT_NAME") + and os.environ.get("ABS_ACCOUNT_KEY") + and os.environ.get("ABS_CONTAINER_NAME") + ) + + +@unittest.skipUnless(azure_abs_is_prepared(), "Azure Blob Storage is not prepared.") +class TestGvfsWithABS(TestGvfsWithHDFS): + # Before running this test, please set the make sure aliyun-azure-x.jar has been + # copy to the $GRAVITINO_HOME/catalogs/hadoop/libs/ directory + azure_abs_account_key = os.environ.get("ABS_ACCOUNT_KEY") + azure_abs_account_name = os.environ.get("ABS_ACCOUNT_NAME") + azure_abs_container_name = os.environ.get("ABS_CONTAINER_NAME") + + metalake_name: str = "TestGvfsWithABS_metalake" + str(randint(1, 10000)) + + def setUp(self): + self.options = { + GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_NAME: self.azure_abs_account_name, + GVFSConfig.GVFS_FILESYSTEM_AZURE_ACCOUNT_KEY: self.azure_abs_account_key, + } + + def tearDown(self): + self.options = {} + + @classmethod + def setUpClass(cls): + cls._get_gravitino_home() + + cls.hadoop_conf_path = f"{cls.gravitino_home}/catalogs/hadoop/conf/hadoop.conf" + # restart the server + cls.restart_server() + # create entity + cls._init_test_entities() + + @classmethod + def tearDownClass(cls): + cls._clean_test_data() + # reset server conf in case of other ITs like HDFS has changed it and fail + # to reset it + cls._reset_conf(cls.config, cls.hadoop_conf_path) + # restart server + cls.restart_server() + + # clear all config in the conf_path + @classmethod + def _reset_conf(cls, config, conf_path): + logger.info("Reset %s.", conf_path) + if not os.path.exists(conf_path): + raise GravitinoRuntimeException(f"Conf file is not found at `{conf_path}`.") + filtered_lines = [] + with open(conf_path, mode="r", encoding="utf-8") as file: + origin_lines = file.readlines() + + for line in origin_lines: + line = line.strip() + if line.startswith("#"): + # append annotations directly + filtered_lines.append(line + "\n") + + with open(conf_path, mode="w", encoding="utf-8") as file: + for line in filtered_lines: + file.write(line) + + @classmethod + def _init_test_entities(cls): + cls.gravitino_admin_client.create_metalake( + name=cls.metalake_name, comment="", properties={} + ) + cls.gravitino_client = GravitinoClient( + uri="http://localhost:8090", metalake_name=cls.metalake_name + ) + + cls.config = {} + cls.conf = {} + catalog = cls.gravitino_client.create_catalog( + name=cls.catalog_name, + catalog_type=Catalog.Type.FILESET, + provider=cls.catalog_provider, + comment="", + properties={ + "filesystem-providers": "abfs", + "abs-account-name": cls.azure_abs_account_name, + "abs-account-key": cls.azure_abs_account_key, + }, + ) + catalog.as_schemas().create_schema( + schema_name=cls.schema_name, comment="", properties={} + ) + + cls.fileset_storage_location: str = ( + f"{cls.azure_abs_container_name}/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + cls.fileset_gvfs_location = ( + f"gvfs://fileset/{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ) + catalog.as_fileset_catalog().create_fileset( + ident=cls.fileset_ident, + fileset_type=Fileset.Type.MANAGED, + comment=cls.fileset_comment, + storage_location=( + f"abfss://{cls.azure_abs_container_name}@{cls.azure_abs_account_name}.dfs.core.windows.net/" + f"{cls.catalog_name}/{cls.schema_name}/{cls.fileset_name}" + ), + properties=cls.fileset_properties, + ) + + cls.fs = AzureBlobFileSystem( + account_name=cls.azure_abs_account_name, + account_key=cls.azure_abs_account_key, + ) + + def test_cat_file(self): + cat_dir = self.fileset_gvfs_location + "/test_cat" + cat_actual_dir = self.fileset_storage_location + "/test_cat" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + self.check_mkdir(cat_dir, cat_actual_dir, fs) + + cat_file = self.fileset_gvfs_location + "/test_cat/test.file" + cat_actual_file = self.fileset_storage_location + "/test_cat/test.file" + self.fs.touch(cat_actual_file) + self.assertTrue(self.fs.exists(cat_actual_file)) + self.assertTrue(fs.exists(cat_file)) + + # test open and write file + with fs.open(cat_file, mode="wb") as f: + f.write(b"test_cat_file") + self.assertTrue(fs.info(cat_file)["size"] > 0) + + # test cat file + content = fs.cat_file(cat_file) + self.assertEqual(b"test_cat_file", content) + + def check_mkdir(self, gvfs_dir, actual_dir, gvfs_instance): + # OSS will not create a directory, so the directory will not exist. + self.fs.mkdir(actual_dir) + self.assertFalse(self.fs.exists(actual_dir)) + self.assertFalse(gvfs_instance.exists(gvfs_dir)) + + def check_makedirs(self, gvfs_dir, actual_dir, gvfs_instance): + self.fs.makedirs(actual_dir) + self.assertFalse(self.fs.exists(actual_dir)) + self.assertFalse(gvfs_instance.exists(gvfs_dir)) + + def test_modified(self): + modified_dir = self.fileset_gvfs_location + "/test_modified" + modified_actual_dir = self.fileset_storage_location + "/test_modified" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + self.check_mkdir(modified_dir, modified_actual_dir, fs) + # S3 only supports getting the `object` modify time, so the modified time will be None + # if it's a directory. + # >>> gcs.mkdir('example_qazwsx/catalog/schema/fileset3') + # >>> r = gcs.modified('example_qazwsx/catalog/schema/fileset3') + # >>> print(r) + # None + # self.assertIsNone(fs.modified(modified_dir)) + + # create a file under the dir 'modified_dir'. + file_path = modified_dir + "/test.txt" + fs.touch(file_path) + self.assertTrue(fs.exists(file_path)) + self.assertIsNotNone(fs.modified(file_path)) + + def test_rm(self): + rm_dir = self.fileset_gvfs_location + "/test_rm" + rm_actual_dir = self.fileset_storage_location + "/test_rm" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rm_dir, rm_actual_dir, fs) + + rm_file = self.fileset_gvfs_location + "/test_rm/test.file" + rm_actual_file = self.fileset_storage_location + "/test_rm/test.file" + fs.touch(rm_file) + self.assertTrue(self.fs.exists(rm_actual_file)) + self.assertTrue(fs.exists(rm_file)) + + # test delete file + fs.rm(rm_file) + self.assertFalse(fs.exists(rm_file)) + + # test delete dir with recursive = false + rm_new_file = self.fileset_gvfs_location + "/test_rm/test_new.file" + rm_new_actual_file = self.fileset_storage_location + "/test_rm/test_new.file" + self.fs.touch(rm_new_actual_file) + self.assertTrue(self.fs.exists(rm_new_actual_file)) + self.assertTrue(fs.exists(rm_new_file)) + + def test_rmdir(self): + rmdir_dir = self.fileset_gvfs_location + "/test_rmdir" + rmdir_actual_dir = self.fileset_storage_location + "/test_rmdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rmdir_dir, rmdir_actual_dir, fs) + + rmdir_file = self.fileset_gvfs_location + "/test_rmdir/test.file" + rmdir_actual_file = self.fileset_storage_location + "/test_rmdir/test.file" + self.fs.touch(rmdir_actual_file) + self.assertTrue(self.fs.exists(rmdir_actual_file)) + self.assertTrue(fs.exists(rmdir_file)) + + # NOT IMPLEMENTED for ABS + # fs.rm_file(rmdir_file) + + def test_mkdir(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.azure_abs_container_name + "2" + mkdir_dir = mkdir_dir.replace(self.azure_abs_container_name, new_bucket) + mkdir_actual_dir = mkdir_actual_dir.replace( + self.azure_abs_container_name, new_bucket + ) + fs.mkdir(mkdir_dir, create_parents=True) + + self.assertFalse(self.fs.exists(mkdir_actual_dir)) + self.assertFalse(fs.exists(mkdir_dir)) + + self.assertFalse(self.fs.exists("wasbs://" + new_bucket)) + + def test_makedirs(self): + mkdir_dir = self.fileset_gvfs_location + "/test_mkdir" + mkdir_actual_dir = self.fileset_storage_location + "/test_mkdir" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + # it actually takes no effect. + self.check_mkdir(mkdir_dir, mkdir_actual_dir, fs) + + # check whether it will automatically create the bucket if 'create_parents' + # is set to True. + new_bucket = self.azure_abs_container_name + "1" + mkdir_dir = mkdir_dir.replace(self.azure_abs_container_name, new_bucket) + mkdir_actual_dir = mkdir_actual_dir.replace( + self.azure_abs_container_name, new_bucket + ) + + fs.makedirs(mkdir_dir) + + self.assertFalse(self.fs.exists(mkdir_actual_dir)) + + self.assertFalse(fs.exists(mkdir_dir)) + self.assertFalse(self.fs.exists("wsabs://" + new_bucket)) + + def test_ls(self): + ls_dir = self.fileset_gvfs_location + "/test_ls" + ls_actual_dir = self.fileset_storage_location + "/test_ls" + + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + + self.check_mkdir(ls_dir, ls_actual_dir, fs) + + ls_file = self.fileset_gvfs_location + "/test_ls/test.file" + ls_actual_file = self.fileset_storage_location + "/test_ls/test.file" + self.fs.touch(ls_actual_file) + self.assertTrue(self.fs.exists(ls_actual_file)) + + # Azure block storage with wasbs protocol does not support listing files. + # test detail = false + file_list_without_detail = fs.ls(ls_dir, detail=False) + self.assertEqual(1, len(file_list_without_detail)) + self.assertEqual(file_list_without_detail[0], ls_file[len("gvfs://") :]) + + # test detail = true + file_list_with_detail = fs.ls(ls_dir, detail=True) + self.assertEqual(1, len(file_list_with_detail)) + self.assertEqual(file_list_with_detail[0]["name"], ls_file[len("gvfs://") :]) + + def test_rm_file(self): + rm_file_dir = self.fileset_gvfs_location + "/test_rm_file" + rm_file_actual_dir = self.fileset_storage_location + "/test_rm_file" + fs = gvfs.GravitinoVirtualFileSystem( + server_uri="http://localhost:8090", + metalake_name=self.metalake_name, + options=self.options, + **self.conf, + ) + self.check_mkdir(rm_file_dir, rm_file_actual_dir, fs) + + rm_file_file = self.fileset_gvfs_location + "/test_rm_file/test.file" + rm_file_actual_file = self.fileset_storage_location + "/test_rm_file/test.file" + self.fs.touch(rm_file_actual_file) + self.assertTrue(self.fs.exists(rm_file_actual_file)) + self.assertTrue(fs.exists(rm_file_file)) + + # test delete file + with self.assertRaises(NotImplementedError): + fs.rm_file(rm_file_file) + self.assertTrue(fs.exists(rm_file_file)) + + # test delete dir + with self.assertRaises(NotImplementedError): + fs.rm_file(rm_file_dir) diff --git a/clients/filesystem-hadoop3/build.gradle.kts b/clients/filesystem-hadoop3/build.gradle.kts index 9836c35147..55c0f59a05 100644 --- a/clients/filesystem-hadoop3/build.gradle.kts +++ b/clients/filesystem-hadoop3/build.gradle.kts @@ -45,6 +45,7 @@ dependencies { testImplementation(project(":bundles:gcp-bundle")) testImplementation(project(":bundles:aliyun-bundle")) testImplementation(project(":bundles:aws-bundle")) + testImplementation(project(":bundles:azure-bundle")) testImplementation(libs.awaitility) testImplementation(libs.bundles.jetty) testImplementation(libs.bundles.jersey) diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSIT.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSIT.java new file mode 100644 index 0000000000..598656b122 --- /dev/null +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemABSIT.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.filesystem.hadoop.integration.test; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.abs.fs.AzureFileSystemProvider; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.gravitino.storage.ABSProperties; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.condition.EnabledIf; +import org.junit.platform.commons.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@EnabledIf("absEnabled") +public class GravitinoVirtualFileSystemABSIT extends GravitinoVirtualFileSystemIT { + private static final Logger LOG = LoggerFactory.getLogger(GravitinoVirtualFileSystemABSIT.class); + + public static final String ABS_ACCOUNT_NAME = System.getenv("ABS_ACCOUNT_NAME"); + public static final String ABS_ACCOUNT_KEY = System.getenv("ABS_ACCOUNT_KEY"); + public static final String ABS_CONTAINER_NAME = System.getenv("ABS_CONTAINER_NAME"); + + @BeforeAll + public void startIntegrationTest() { + // Do nothing + } + + @BeforeAll + public void startUp() throws Exception { + // Copy the GCP jars to the gravitino server if in deploy mode. + copyBundleJarsToHadoop("azure-bundle"); + // Need to download jars to gravitino server + super.startIntegrationTest(); + + // This value can be by tune by the user, please change it accordingly. + defaultBockSize = 32 * 1024 * 1024; + + // This value is 1 for ABFS, 3 for GCS, and 1 for S3A. + defaultReplication = 1; + + metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake"); + catalogName = GravitinoITUtils.genRandomName("catalog"); + schemaName = GravitinoITUtils.genRandomName("schema"); + + Assertions.assertFalse(client.metalakeExists(metalakeName)); + metalake = client.createMetalake(metalakeName, "metalake comment", Collections.emptyMap()); + Assertions.assertTrue(client.metalakeExists(metalakeName)); + + Map properties = Maps.newHashMap(); + + properties.put(ABSProperties.GRAVITINO_ABS_ACCOUNT_NAME, ABS_ACCOUNT_NAME); + properties.put(ABSProperties.GRAVITINO_ABS_ACCOUNT_KEY, ABS_ACCOUNT_KEY); + properties.put(FILESYSTEM_PROVIDERS, AzureFileSystemProvider.ABS_PROVIDER_NAME); + + Catalog catalog = + metalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", properties); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + catalog.asSchemas().createSchema(schemaName, "schema comment", properties); + Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName)); + + conf.set("fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem"); + conf.set("fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs"); + conf.set("fs.gvfs.impl.disable.cache", "true"); + conf.set("fs.gravitino.server.uri", serverUri); + conf.set("fs.gravitino.client.metalake", metalakeName); + + conf.set("fs.gvfs.filesystem.providers", AzureFileSystemProvider.ABS_PROVIDER_NAME); + // Pass this configuration to the real file system + conf.set(ABSProperties.GRAVITINO_ABS_ACCOUNT_NAME, ABS_ACCOUNT_NAME); + conf.set(ABSProperties.GRAVITINO_ABS_ACCOUNT_KEY, ABS_ACCOUNT_KEY); + conf.set("fs.abfss.impl", "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem"); + } + + @AfterAll + public void tearDown() throws IOException { + Catalog catalog = metalake.loadCatalog(catalogName); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName, true); + client.dropMetalake(metalakeName, true); + + if (client != null) { + client.close(); + client = null; + } + + try { + closer.close(); + } catch (Exception e) { + LOG.error("Exception in closing CloseableGroup", e); + } + } + + /** + * Remove the `gravitino.bypass` prefix from the configuration and pass it to the real file system + * This method corresponds to the method org.apache.gravitino.filesystem.hadoop + * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original code. + */ + protected Configuration convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) { + Configuration absConf = new Configuration(); + Map map = Maps.newHashMap(); + + gvfsConf.forEach(entry -> map.put(entry.getKey(), entry.getValue())); + + Map hadoopConfMap = FileSystemUtils.toHadoopConfigMap(map, ImmutableMap.of()); + + if (gvfsConf.get(ABSProperties.GRAVITINO_ABS_ACCOUNT_NAME) != null + && gvfsConf.get(ABSProperties.GRAVITINO_ABS_ACCOUNT_KEY) != null) { + hadoopConfMap.put( + String.format( + "fs.azure.account.key.%s.dfs.core.windows.net", + gvfsConf.get(ABSProperties.GRAVITINO_ABS_ACCOUNT_NAME)), + gvfsConf.get(ABSProperties.GRAVITINO_ABS_ACCOUNT_KEY)); + } + + hadoopConfMap.forEach(absConf::set); + + return absConf; + } + + protected String genStorageLocation(String fileset) { + return String.format( + "%s://%s@%s.dfs.core.windows.net/%s", + AzureFileSystemProvider.ABS_PROVIDER_SCHEME, ABS_CONTAINER_NAME, ABS_ACCOUNT_NAME, fileset); + } + + @Disabled("java.lang.UnsupportedOperationException: Append Support not enabled") + public void testAppend() throws IOException {} + + private static boolean absEnabled() { + return StringUtils.isNotBlank(System.getenv("ABS_ACCOUNT_NAME")) + && StringUtils.isNotBlank(System.getenv("ABS_ACCOUNT_KEY")) + && StringUtils.isNotBlank(System.getenv("ABS_CONTAINER_NAME")); + } +} diff --git a/docs/hadoop-catalog.md b/docs/hadoop-catalog.md index 0622574d4d..70201509ec 100644 --- a/docs/hadoop-catalog.md +++ b/docs/hadoop-catalog.md @@ -76,6 +76,18 @@ In the meantime, you need to place the corresponding bundle jar [`gravitino-gcp- In the meantime, you need to place the corresponding bundle jar [`gravitino-aliyun-bundle-${version}.jar`](https://repo1.maven.org/maven2/org/apache/gravitino/aliyun-bundle/) in the directory `${GRAVITINO_HOME}/catalogs/hadoop/libs`. + +#### Azure Blob Storage fileset + +| Configuration item | Description | Default value | Required | Since version | +|-------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------|-------------------------------------------|------------------| +| `filesystem-providers` | The file system providers to add. Set it to `abfss` if it's a Azure block storage fileset, or a comma separated string that contains `abfss` like `oss,abfss,s3` to support multiple kinds of fileset including `abfss`. | (none) | Yes | 0.8.0-incubating | +| `default-filesystem-provider` | The name default filesystem providers of this Hadoop catalog if users do not specify the scheme in the URI. Default value is `builtin-local`, for OSS, if we set this value, we can omit the prefix 'abfss://' in the location. | `builtin-local` | No | 0.8.0-incubating | +| `abs-account-name` | The account name of Azure blob storage. | (none) | Yes if it's a Azure blob storage fileset. | 0.8.0-incubating | +| `abs-account-key` | The account key of Azure blob storage. | (none) | Yes if it's a Azure blob storage fileset. | 0.8.0-incubating | + +Similar to the above, you need to place the corresponding bundle jar [`gravitino-azure-bundle-${version}.jar`](https://repo1.maven.org/maven2/org/apache/gravitino/azure-bundle/) in the directory `${GRAVITINO_HOME}/catalogs/hadoop/libs`. + :::note - Gravitino contains builtin file system providers for local file system(`builtin-local`) and HDFS(`builtin-hdfs`), that is to say if `filesystem-providers` is not set, Gravitino will still support local file system and HDFS. Apart from that, you can set the `filesystem-providers` to support other file systems like S3, GCS, OSS or custom file system. - `default-filesystem-provider` is used to set the default file system provider for the Hadoop catalog. If the user does not specify the scheme in the URI, Gravitino will use the default file system provider to access the fileset. For example, if the default file system provider is set to `builtin-local`, the user can omit the prefix `file://` in the location. diff --git a/docs/how-to-use-gvfs.md b/docs/how-to-use-gvfs.md index 7a3373092c..09d73be6e4 100644 --- a/docs/how-to-use-gvfs.md +++ b/docs/how-to-use-gvfs.md @@ -102,6 +102,17 @@ In the meantime, you need to place the corresponding bundle jar [`gravitino-gcp- In the meantime, you need to place the corresponding bundle jar [`gravitino-aliyun-bundle-${version}.jar`](https://repo1.maven.org/maven2/org/apache/gravitino/aliyun-bundle/) in the Hadoop environment(typically located in `${HADOOP_HOME}/share/hadoop/common/lib/`). +#### Azure blob storage fileset + +| Configuration item | Description | Default value | Required | Since version | +|--------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------|-------------------------------------------|------------------| +| `fs.gvfs.filesystem.providers` | The file system providers to add. Set it to `abfss` if it's a Azure block storage fileset, or a comma separated string that contains `abfss` like `oss,abfss,s3` to support multiple kinds of fileset including `abfss`. | (none) | Yes | 0.8.0-incubating | +| `abs-account-name` | The account name of Azure blob storage. | (none) | Yes if it's a Azure blob storage fileset. | 0.8.0-incubating | +| `abs-account-key` | The account key of Azure blob storage. | (none) | Yes if it's a Azure blob storage fileset. | 0.8.0-incubating | + +Similar to the above, you need to place the corresponding bundle jar [`gravitino-azure-bundle-${version}.jar`](https://repo1.maven.org/maven2/org/apache/gravitino/azure-bundle/) in the Hadoop environment(typically located in `${HADOOP_HOME}/share/hadoop/common/lib/`). + + #### Custom fileset Since 0.7.0-incubating, users can define their own fileset type and configure the corresponding properties, for more, please refer to [Custom Fileset](./hadoop-catalog.md#how-to-custom-your-own-hcfs-file-system-fileset). So, if you want to access the custom fileset through GVFS, you need to configure the corresponding properties. @@ -449,6 +460,14 @@ The following properties are required if you want to access the OSS fileset via | `oss_access_key_id` | The access key of the Aliyun OSS. | (none) | Yes if it's a OSS fileset. | 0.7.0-incubating | | `oss_secret_access_key` | The secret key of the Aliyun OSS. | (none) | Yes if it's a OSS fileset. | 0.7.0-incubating | +For Azure blob storage fileset, you need to configure the following properties: + +| Configuration item | Description | Default value | Required | Since version | +|------------------------------|----------------------------------------|---------------|-------------------------------------------|------------------| +| `azure_storage_account_name` | The account name of Azure blob storage | (none) | Yes if it's a Azure blob storage fileset. | 0.8.0-incubating | +| `azure_storage_account_key` | The account key of Azure blob storage | (none) | Yes if it's a Azure blob storage fileset. | 0.8.0-incubating | + + You can configure these properties when obtaining the `Gravitino Virtual FileSystem` in Python like this: ```python diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3241a48375..a217b20bc1 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -35,6 +35,7 @@ hadoop2 = "2.10.2" hadoop3 = "3.1.0" hadoop3-gcs = "1.9.4-hadoop3" hadoop3-aliyun = "3.1.0" +hadoop3-abs = "3.2.1" hadoop-minikdc = "3.3.6" htrace-core4 = "4.1.0-incubating" httpclient5 = "5.2.1" @@ -169,6 +170,7 @@ hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client", version. hadoop3-minicluster = { group = "org.apache.hadoop", name = "hadoop-minicluster", version.ref = "hadoop-minikdc"} hadoop3-gcs = { group = "com.google.cloud.bigdataoss", name = "gcs-connector", version.ref = "hadoop3-gcs"} hadoop3-oss = { group = "org.apache.hadoop", name = "hadoop-aliyun", version.ref = "hadoop3-aliyun"} +hadoop3-abs = { group = "org.apache.hadoop", name = "hadoop-azure", version.ref = "hadoop3-abs"} htrace-core4 = { group = "org.apache.htrace", name = "htrace-core4", version.ref = "htrace-core4" } airlift-json = { group = "io.airlift", name = "json", version.ref = "airlift-json"} airlift-resolver = { group = "io.airlift.resolver", name = "resolver", version.ref = "airlift-resolver"} diff --git a/settings.gradle.kts b/settings.gradle.kts index 1f3efb4954..3b0d963659 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -74,3 +74,5 @@ include("integration-test-common") include(":bundles:aws-bundle") include(":bundles:gcp-bundle") include(":bundles:aliyun-bundle") +include("bundles:azure-bundle") +findProject(":bundles:azure-bundle")?.name = "azure-bundle"