Skip to content

Commit

Permalink
fs close check throw io exception (#77)
Browse files Browse the repository at this point in the history
Co-authored-by: alantong(佟明达) <[email protected]>
  • Loading branch information
vintmd and vintmd authored Sep 9, 2022
1 parent 06de7fe commit 8ae8414
Showing 1 changed file with 28 additions and 0 deletions.
28 changes: 28 additions & 0 deletions src/main/java/org/apache/hadoop/fs/CosFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class CosFileSystem extends FileSystem {
private NativeFileSystemStore nativeStore;
private boolean isPosixFSStore;
private boolean isDefaultNativeStore;
private volatile boolean healthyFlag = false;
private boolean isPosixUseOFSRanger;
private boolean isPosixImpl = false;
private FileSystem actualImplFS = null;
Expand Down Expand Up @@ -143,6 +144,8 @@ public void initialize(URI uri, Configuration conf) throws IOException {


this.actualImplFS.initialize(uri, conf);
// init status
this.healthyFlag = true;
}

// load class to get relate file system
Expand Down Expand Up @@ -171,13 +174,15 @@ public Path getHomeDirectory() {
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
LOG.debug("append file [{}] in COS.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.append(f, bufferSize, progress);
}

@Override
public boolean truncate(Path f, long newLength) throws IOException {
LOG.debug("truncate file [{}] in COS.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.truncate(f, newLength);
}
Expand All @@ -189,6 +194,7 @@ public FSDataOutputStream create(Path f, FsPermission permission,
long blockSize, Progressable progress)
throws IOException {
LOG.debug("Creating a new file [{}] in COS.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.create(f, permission, overwrite, bufferSize,
replication, blockSize, progress);
Expand All @@ -198,13 +204,15 @@ public FSDataOutputStream create(Path f, FsPermission permission,
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
LOG.debug("Ready to delete path: {}. recursive: {}.", f, recursive);
healthyCheck();
checkPermission(f, RangerAccessType.DELETE);
return this.actualImplFS.delete(f, recursive);
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
LOG.debug("Get file status: {}.", f);
healthyCheck();
// keep same not change ranger permission here
return this.actualImplFS.getFileStatus(f);
}
Expand All @@ -227,6 +235,7 @@ public URI getUri() {
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
LOG.debug("list status:" + f);
healthyCheck();
checkPermission(f, RangerAccessType.LIST);
return this.actualImplFS.listStatus(f);
}
Expand All @@ -235,20 +244,23 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException
public boolean mkdirs(Path f, FsPermission permission)
throws IOException {
LOG.debug("mkdirs path: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
return this.actualImplFS.mkdirs(f, permission);
}

@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
LOG.debug("Open file [{}] to read, buffer [{}]", f, bufferSize);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.open(f, bufferSize);
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
LOG.debug("Rename the source path [{}] to the dest path [{}].", src, dst);
healthyCheck();
checkPermission(src, RangerAccessType.DELETE);
checkPermission(dst, RangerAccessType.WRITE);
return this.actualImplFS.rename(src, dst);
Expand Down Expand Up @@ -276,6 +288,7 @@ public Path getWorkingDirectory() {
@Override
public FileChecksum getFileChecksum(Path f, long length) throws IOException {
LOG.debug("call the checksum for the path: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
Preconditions.checkArgument(length >= 0);
return this.actualImplFS.getFileChecksum(f, length);
Expand All @@ -294,6 +307,7 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException {
@Override
public void setXAttr(Path f, String name, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException {
LOG.debug("set XAttr: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
this.actualImplFS.setXAttr(f, name, value, flag);
}
Expand All @@ -309,6 +323,7 @@ public void setXAttr(Path f, String name, byte[] value, EnumSet<XAttrSetFlag> fl
@Override
public byte[] getXAttr(Path f, String name) throws IOException {
LOG.debug("get XAttr: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttr(f, name);
}
Expand All @@ -324,13 +339,15 @@ public byte[] getXAttr(Path f, String name) throws IOException {
@Override
public Map<String, byte[]> getXAttrs(Path f, List<String> names) throws IOException {
LOG.debug("get XAttrs: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttrs(f, names);
}

@Override
public Map<String, byte[]> getXAttrs(Path f) throws IOException {
LOG.debug("get XAttrs: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.getXAttrs(f);
}
Expand All @@ -345,13 +362,15 @@ public Map<String, byte[]> getXAttrs(Path f) throws IOException {
@Override
public void removeXAttr(Path f, String name) throws IOException {
LOG.debug("remove XAttr: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.WRITE);
this.actualImplFS.removeXAttr(f, name);
}

@Override
public List<String> listXAttrs(Path f) throws IOException {
LOG.debug("list XAttrs: {}.", f);
healthyCheck();
checkPermission(f, RangerAccessType.READ);
return this.actualImplFS.listXAttrs(f);
}
Expand Down Expand Up @@ -429,6 +448,7 @@ private void transferOfsConfig() {
// CHDFS Support Only
public void releaseFileLock(Path f) throws IOException {
LOG.debug("Release the file lock: {}.", f);
healthyCheck();
if (this.actualImplFS instanceof CHDFSHadoopFileSystemAdapter) {
((CHDFSHadoopFileSystemAdapter) this.actualImplFS).releaseFileLock(f);
} else {
Expand Down Expand Up @@ -464,6 +484,7 @@ private boolean useOFSRanger() {
*/
private void checkCustomAuth(Configuration conf) throws IOException {
// todo: need get token first
healthyCheck();
this.rangerCredentialsClient.doCheckCustomAuth(conf);
}

Expand All @@ -488,6 +509,12 @@ private String getOwnerId() {
return shortUserName;
}

private void healthyCheck() throws IOException {
if (!this.healthyFlag) {
throw new IOException("fileSystem has been closed or not init");
}
}

@Override
public void close() throws IOException {
LOG.info("begin to close cos file system");
Expand All @@ -496,5 +523,6 @@ public void close() throws IOException {
// close range client later, inner native store
this.nativeStore.close();
}
this.healthyFlag = false;
}
}

0 comments on commit 8ae8414

Please sign in to comment.