Skip to content

Commit

Permalink
perf: add the enabled option for the upload part checksum.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyang733 committed May 11, 2022
1 parent 714c19b commit a331030
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 22 deletions.
6 changes: 4 additions & 2 deletions src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String DEFAULT_TMP_DIR = "/tmp/hadoop_cos";

// upload checks, turn on default.
public static final String COSN_UPLOAD_CHECKS_ENABLE_KEY = "fs.cosn.upload.checks.enable";
public static final String COSN_UPLOAD_CHECKS_ENABLED_KEY = "fs.cosn.upload.checks.enabled";
public static final boolean DEFAULT_COSN_UPLOAD_CHECKS_ENABLE = true;

public static final String COSN_UPLOAD_PART_SIZE_KEY = "fs.cosn.upload.part.size";
Expand All @@ -56,6 +56,9 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String COSN_UPLOAD_BUFFER_TYPE_KEY = "fs.cosn.upload.buffer";
public static final String DEFAULT_UPLOAD_BUFFER_TYPE = "mapped_disk";

public static final String COSN_UPLOAD_PART_CHECKSUM_ENABLED_KEY = "fs.cosn.upload.part.checksum.enabled";
public static final boolean DEFAULT_UPLOAD_PART_CHECKSUM_ENABLED = true;

public static final String COSN_UPLOAD_BUFFER_SIZE_KEY = "fs.cosn.upload.buffer.size";
public static final String COSN_UPLOAD_BUFFER_SIZE_PREV_KEY = "fs.cosn.buffer.size";
public static final int DEFAULT_UPLOAD_BUFFER_SIZE = -1;
Expand Down Expand Up @@ -125,7 +128,6 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final int DEFAULT_HTTP_PROXY_PORT = -1;
public static final String HTTP_PROXY_USERNAME = "fs.cosn.http.proxy.username";
public static final String HTTP_PROXY_PASSWORD = "fs.cosn.http.proxy.password";

public static final String COSN_RANGER_TEMP_TOKEN_REFRESH_INTERVAL = "fs.cosn.ranger.temp.token.refresh.interval";
public static final int DEFAULT_COSN_RANGER_TEMP_TOKEN_REFRESH_INTERVAL = 20;

Expand Down
61 changes: 41 additions & 20 deletions src/main/java/org/apache/hadoop/fs/CosNFSDataOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,22 @@ public CosNFSDataOutputStream(
this.committed = false;
this.closed = false;

try {
this.currentPartMessageDigest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
LOG.warn("Failed to MD5 digest, the upload will not check.");
if (conf.getBoolean(CosNConfigKeys.COSN_UPLOAD_PART_CHECKSUM_ENABLED_KEY,
CosNConfigKeys.DEFAULT_COSN_UPLOAD_CHECKS_ENABLE)) {
LOG.info("The MPU-UploadPart checksum is enabled, and the message digest algorithm is {}.", "MD5");
try {
this.currentPartMessageDigest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
LOG.warn("Failed to MD5 digest, the upload will not check.");
this.currentPartMessageDigest = null;
}
} else {
// close the current part message digest.
LOG.warn("The MPU-UploadPart checksum is disabled.");
this.currentPartMessageDigest = null;
}

boolean uploadChecksEnabled = conf.getBoolean(CosNConfigKeys.COSN_UPLOAD_CHECKS_ENABLE_KEY,
boolean uploadChecksEnabled = conf.getBoolean(CosNConfigKeys.COSN_UPLOAD_CHECKS_ENABLED_KEY,
CosNConfigKeys.DEFAULT_COSN_UPLOAD_CHECKS_ENABLE);
if (uploadChecksEnabled) {
LOG.info("The consistency checker is enabled.");
Expand Down Expand Up @@ -183,7 +191,7 @@ public synchronized void close() throws IOException {
return;
}

LOG.info("Close the output stream [{}].", this);
LOG.info("Closing the output stream [{}].", this);
try {
this.flush();
this.commit();
Expand All @@ -200,7 +208,7 @@ public synchronized void abort() throws IOException {
return;
}

LOG.info("Abort the output stream [{}].", this);
LOG.info("Aborting the output stream [{}].", this);
try {
if (null != this.multipartUpload) {
this.multipartUpload.abort();
Expand Down Expand Up @@ -273,8 +281,12 @@ protected void resetContext() throws IOException {
this.dirty = true;
this.committed = false;

this.currentPartMessageDigest.reset();
this.consistencyChecker.reset();
if (this.currentPartMessageDigest != null) {
this.currentPartMessageDigest.reset();
}
if (this.consistencyChecker != null) {
this.consistencyChecker.reset();
}
}

protected void checkOpened() throws IOException {
Expand Down Expand Up @@ -318,9 +330,13 @@ protected void initNewCurrentPartResource() throws IOException {
throw new IOException(exceptionMsg);
}
// init the stream
this.currentPartMessageDigest.reset();
this.currentPartOutputStream = new DigestOutputStream(
new BufferOutputStream(this.currentPartBuffer), this.currentPartMessageDigest);
if (null != this.currentPartMessageDigest) {
this.currentPartMessageDigest.reset();
this.currentPartOutputStream = new DigestOutputStream(
new BufferOutputStream(this.currentPartBuffer), this.currentPartMessageDigest);
} else {
this.currentPartOutputStream = new BufferOutputStream(this.currentPartBuffer);
}
}

protected void releaseCurrentPartResource() throws IOException {
Expand All @@ -333,7 +349,9 @@ protected void releaseCurrentPartResource() throws IOException {
this.currentPartOutputStream = null;
}

this.currentPartMessageDigest.reset();
if (null != this.currentPartMessageDigest) {
this.currentPartMessageDigest.reset();
}

if (null != this.currentPartBuffer) {
BufferPool.getInstance().returnBuffer(this.currentPartBuffer);
Expand Down Expand Up @@ -514,7 +532,7 @@ protected List<PartETag> waitForFinishPartUploads() throws IOException {
}

protected void complete() throws IOException {
LOG.info("Completing the MPU [{}].", this);
LOG.info("Completing the MPU [{}].", this.getUploadId());
if (this.isCompleted() || this.isAborted()) {
throw new IOException(String.format("fail to complete the MPU [%s]. "
+ "It has been completed or aborted.", this));
Expand All @@ -527,18 +545,18 @@ protected void complete() throws IOException {
CompleteMultipartUploadResult completeResult =
nativeStore.completeMultipartUpload(cosKey, this.uploadId, new LinkedList<>(futurePartETagList));
this.completed = true;
LOG.info("The MPU [{}] has been completed.", this);
LOG.info("The MPU [{}] has been completed.", this.getUploadId());
}

protected void abort() throws IOException {
LOG.info("Aborting the MPU [{}].", this);
LOG.info("Aborting the MPU [{}].", this.getUploadId());
if (this.isCompleted() || this.isAborted()) {
throw new IOException(String.format("fail to abort the MPU [%s]. "
+ "It has been completed or aborted.", this));
+ "It has been completed or aborted.", this.getUploadId()));
}
nativeStore.abortMultipartUpload(cosKey, this.uploadId);
this.aborted = true;
LOG.info("The MPU [{}] has been aborted.", this);
LOG.info("The MPU [{}] has been aborted.", this.getUploadId());
}
}

Expand Down Expand Up @@ -573,8 +591,11 @@ public byte[] getMd5Hash() {

@Override
public String toString() {
return String.format("UploadPart{partNumber:%d, partSize: %d, md5Hash: %s, isLast: %s}", this.partNumber,
this.cosNByteBuffer.flipRead().remaining(), Hex.encodeHexString(this.md5Hash), this.isLast);
return String.format("UploadPart{partNumber:%d, partSize: %d, md5Hash: %s, isLast: %s}",
this.partNumber,
this.cosNByteBuffer.flipRead().remaining(),
(this.md5Hash != null ? Hex.encodeHexString(this.md5Hash): "NULL"),
this.isLast);
}
}
}

0 comments on commit a331030

Please sign in to comment.