From 8ae84147910038f81aefdc9ca113439bcdf15aeb Mon Sep 17 00:00:00 2001 From: vintmd <61688729+vintmd@users.noreply.github.com> Date: Fri, 9 Sep 2022 20:10:51 +0800 Subject: [PATCH] fs close check throw io exception (#77) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: alantong(佟明达) --- .../org/apache/hadoop/fs/CosFileSystem.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java index 08868717..1574d5b6 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java @@ -43,6 +43,7 @@ public class CosFileSystem extends FileSystem { private NativeFileSystemStore nativeStore; private boolean isPosixFSStore; private boolean isDefaultNativeStore; + private volatile boolean healthyFlag = false; private boolean isPosixUseOFSRanger; private boolean isPosixImpl = false; private FileSystem actualImplFS = null; @@ -143,6 +144,8 @@ public void initialize(URI uri, Configuration conf) throws IOException { this.actualImplFS.initialize(uri, conf); + // init status + this.healthyFlag = true; } // load class to get relate file system @@ -171,6 +174,7 @@ public Path getHomeDirectory() { public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { LOG.debug("append file [{}] in COS.", f); + healthyCheck(); checkPermission(f, RangerAccessType.WRITE); return this.actualImplFS.append(f, bufferSize, progress); } @@ -178,6 +182,7 @@ public FSDataOutputStream append(Path f, int bufferSize, @Override public boolean truncate(Path f, long newLength) throws IOException { LOG.debug("truncate file [{}] in COS.", f); + healthyCheck(); checkPermission(f, RangerAccessType.WRITE); return this.actualImplFS.truncate(f, newLength); } @@ -189,6 +194,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, long blockSize, Progressable progress) throws IOException { LOG.debug("Creating a new file [{}] in COS.", f); + healthyCheck(); checkPermission(f, RangerAccessType.WRITE); return this.actualImplFS.create(f, permission, overwrite, bufferSize, replication, blockSize, progress); @@ -198,6 +204,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, @Override public boolean delete(Path f, boolean recursive) throws IOException { LOG.debug("Ready to delete path: {}. recursive: {}.", f, recursive); + healthyCheck(); checkPermission(f, RangerAccessType.DELETE); return this.actualImplFS.delete(f, recursive); } @@ -205,6 +212,7 @@ public boolean delete(Path f, boolean recursive) throws IOException { @Override public FileStatus getFileStatus(Path f) throws IOException { LOG.debug("Get file status: {}.", f); + healthyCheck(); // keep same not change ranger permission here return this.actualImplFS.getFileStatus(f); } @@ -227,6 +235,7 @@ public URI getUri() { @Override public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { LOG.debug("list status:" + f); + healthyCheck(); checkPermission(f, RangerAccessType.LIST); return this.actualImplFS.listStatus(f); } @@ -235,6 +244,7 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException public boolean mkdirs(Path f, FsPermission permission) throws IOException { LOG.debug("mkdirs path: {}.", f); + healthyCheck(); checkPermission(f, RangerAccessType.WRITE); return this.actualImplFS.mkdirs(f, permission); } @@ -242,6 +252,7 @@ public boolean mkdirs(Path f, FsPermission permission) @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { LOG.debug("Open file [{}] to read, buffer [{}]", f, bufferSize); + healthyCheck(); checkPermission(f, RangerAccessType.READ); return this.actualImplFS.open(f, bufferSize); } @@ -249,6 +260,7 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { @Override public boolean rename(Path src, Path dst) throws IOException { LOG.debug("Rename the source path [{}] to the dest path [{}].", src, dst); + healthyCheck(); checkPermission(src, RangerAccessType.DELETE); checkPermission(dst, RangerAccessType.WRITE); return this.actualImplFS.rename(src, dst); @@ -276,6 +288,7 @@ public Path getWorkingDirectory() { @Override public FileChecksum getFileChecksum(Path f, long length) throws IOException { LOG.debug("call the checksum for the path: {}.", f); + healthyCheck(); checkPermission(f, RangerAccessType.READ); Preconditions.checkArgument(length >= 0); return this.actualImplFS.getFileChecksum(f, length); @@ -294,6 +307,7 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException { @Override public void setXAttr(Path f, String name, byte[] value, EnumSet flag) throws IOException { LOG.debug("set XAttr: {}.", f); + healthyCheck(); checkPermission(f, RangerAccessType.WRITE); this.actualImplFS.setXAttr(f, name, value, flag); } @@ -309,6 +323,7 @@ public void setXAttr(Path f, String name, byte[] value, EnumSet fl @Override public byte[] getXAttr(Path f, String name) throws IOException { LOG.debug("get XAttr: {}.", f); + healthyCheck(); checkPermission(f, RangerAccessType.READ); return this.actualImplFS.getXAttr(f, name); } @@ -324,6 +339,7 @@ public byte[] getXAttr(Path f, String name) throws IOException { @Override public Map getXAttrs(Path f, List names) throws IOException { LOG.debug("get XAttrs: {}.", f); + healthyCheck(); checkPermission(f, RangerAccessType.READ); return this.actualImplFS.getXAttrs(f, names); } @@ -331,6 +347,7 @@ public Map getXAttrs(Path f, List names) throws IOExcept @Override public Map getXAttrs(Path f) throws IOException { LOG.debug("get XAttrs: {}.", f); + healthyCheck(); checkPermission(f, RangerAccessType.READ); return this.actualImplFS.getXAttrs(f); } @@ -345,6 +362,7 @@ public Map getXAttrs(Path f) throws IOException { @Override public void removeXAttr(Path f, String name) throws IOException { LOG.debug("remove XAttr: {}.", f); + healthyCheck(); checkPermission(f, RangerAccessType.WRITE); this.actualImplFS.removeXAttr(f, name); } @@ -352,6 +370,7 @@ public void removeXAttr(Path f, String name) throws IOException { @Override public List listXAttrs(Path f) throws IOException { LOG.debug("list XAttrs: {}.", f); + healthyCheck(); checkPermission(f, RangerAccessType.READ); return this.actualImplFS.listXAttrs(f); } @@ -429,6 +448,7 @@ private void transferOfsConfig() { // CHDFS Support Only public void releaseFileLock(Path f) throws IOException { LOG.debug("Release the file lock: {}.", f); + healthyCheck(); if (this.actualImplFS instanceof CHDFSHadoopFileSystemAdapter) { ((CHDFSHadoopFileSystemAdapter) this.actualImplFS).releaseFileLock(f); } else { @@ -464,6 +484,7 @@ private boolean useOFSRanger() { */ private void checkCustomAuth(Configuration conf) throws IOException { // todo: need get token first + healthyCheck(); this.rangerCredentialsClient.doCheckCustomAuth(conf); } @@ -488,6 +509,12 @@ private String getOwnerId() { return shortUserName; } + private void healthyCheck() throws IOException { + if (!this.healthyFlag) { + throw new IOException("fileSystem has been closed or not init"); + } + } + @Override public void close() throws IOException { LOG.info("begin to close cos file system"); @@ -496,5 +523,6 @@ public void close() throws IOException { // close range client later, inner native store this.nativeStore.close(); } + this.healthyFlag = false; } }