diff --git a/pom.xml b/pom.xml
index baf79fa8..df127968 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,7 +41,7 @@
1.8
1.8
3.1.0
- 5.6.29
+ 5.6.31
24.1.1-jre
3.1
4.8
diff --git a/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java b/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java
new file mode 100644
index 00000000..7dd53008
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java
@@ -0,0 +1,64 @@
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigInteger;
+
+/**
+ * An etag as a checksum.
+ * Consider these suitable for checking if an object has changed, but
+ * not suitable for comparing two different objects for equivalence,
+ * especially between hadoop compatible filesystem.
+ */
+public class CRC32CCheckSum extends FileChecksum {
+ private static final String ALGORITHM_NAME = "COMPOSITE-CRC32C";
+
+ private int crc32c = 0;
+
+ public CRC32CCheckSum() {
+ }
+
+
+ public CRC32CCheckSum(String crc32cecma) {
+ try {
+ BigInteger bigInteger = new BigInteger(crc32cecma);
+ this.crc32c = bigInteger.intValue();
+ } catch (NumberFormatException e) {
+ this.crc32c = 0;
+ }
+ }
+
+ @Override
+ public String getAlgorithmName() {
+ return CRC32CCheckSum.ALGORITHM_NAME;
+ }
+
+ @Override
+ public int getLength() {
+ return Integer.SIZE / Byte.SIZE;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return CrcUtil.intToBytes(crc32c);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeInt(this.crc32c);
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ this.crc32c = dataInput.readInt();
+ }
+
+ @Override
+ public String toString() {
+ return getAlgorithmName() + ":" + String.format("0x%08x", crc32c);
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/fs/Constants.java b/src/main/java/org/apache/hadoop/fs/Constants.java
index 5daafae3..42973579 100644
--- a/src/main/java/org/apache/hadoop/fs/Constants.java
+++ b/src/main/java/org/apache/hadoop/fs/Constants.java
@@ -10,6 +10,13 @@ private Constants() {
// Suffix for local cache file name
public static final String BLOCK_TMP_FILE_SUFFIX = "_local_block_cache";
+ // Crc32c server response header key
+ public static final String CRC32C_RESP_HEADER = "x-cos-hash-crc32c";
+ // Crc32c agent request header key
+ public static final String CRC32C_REQ_HEADER = "x-cos-crc32c-flag";
+ // Crc32c agent request header value
+ public static final String CRC32C_REQ_HEADER_VAL = "cosn";
+
// Maximum number of blocks uploaded in trunks.
public static final int MAX_PART_NUM = 10000;
// The maximum size of a single block.
diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java
index c6cd182c..3eabc88c 100644
--- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java
+++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java
@@ -820,13 +820,27 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException {
Preconditions.checkArgument(length >= 0);
LOG.debug("Call the checksum for the path: {}.", f);
+ // The order of each file, must support both crc at same time, how to tell the difference crc request?
if (this.getConf().getBoolean(CosNConfigKeys.CRC64_CHECKSUM_ENABLED,
CosNConfigKeys.DEFAULT_CRC64_CHECKSUM_ENABLED)) {
Path absolutePath = makeAbsolute(f);
String key = pathToKey(absolutePath);
FileMetadata fileMetadata = this.store.retrieveMetadata(key);
+ if (null == fileMetadata) {
+ throw new FileNotFoundException("File or directory doesn't exist: " + f);
+ }
String crc64ecm = fileMetadata.getCrc64ecm();
return crc64ecm != null ? new CRC64Checksum(crc64ecm) : super.getFileChecksum(f, length);
+ } else if (this.getConf().getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED,
+ CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED)) {
+ Path absolutePath = makeAbsolute(f);
+ String key = pathToKey(absolutePath);
+ FileMetadata fileMetadata = this.store.retrieveMetadata(key);
+ if (null == fileMetadata) {
+ throw new FileNotFoundException("File or directory doesn't exist: " + f);
+ }
+ String crc32cm = fileMetadata.getCrc32cm();
+ return crc32cm != null ? new CRC32CCheckSum(crc32cm) : super.getFileChecksum(f, length);
} else {
// disabled
return super.getFileChecksum(f, length);
diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
index ccadaf7b..e1f55731 100644
--- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
+++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
@@ -83,8 +83,12 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final int DEFAULT_TRAFFIC_LIMIT = -1;
// checksum
+ // crc64
public static final String CRC64_CHECKSUM_ENABLED = "fs.cosn.crc64.checksum.enabled";
public static final boolean DEFAULT_CRC64_CHECKSUM_ENABLED = false;
+ // crc32c
+ public static final String CRC32C_CHECKSUM_ENABLED = "fs.cosn.crc32c.checksum.enabled";
+ public static final boolean DEFAULT_CRC32C_CHECKSUM_ENABLED = false;
public static final String HTTP_PROXY_IP = "fs.cosn.http.proxy.ip";
public static final String HTTP_PROXY_PORT = "fs.cosn.http.proxy.port";
diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java
index c7d4449d..923f2668 100644
--- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java
+++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java
@@ -11,6 +11,7 @@
import com.qcloud.cos.utils.Jackson;
import com.qcloud.cos.utils.StringUtils;
import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.math3.analysis.function.Constant;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -41,6 +42,7 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore {
private StorageClass storageClass;
private int maxRetryTimes;
private int trafficLimit;
+ private boolean crc32cEnabled;
private CosEncryptionSecrets encryptionSecrets;
private CustomerDomainEndpointResolver customerDomainEndpointResolver;
@@ -90,6 +92,9 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException {
config.setHttpProtocol(HttpProtocol.https);
}
+ this.crc32cEnabled = conf.getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED,
+ CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED);
+
// Proxy settings
String httpProxyIp = conf.getTrimmed(CosNConfigKeys.HTTP_PROXY_IP);
int httpProxyPort = conf.getInt(CosNConfigKeys.HTTP_PROXY_PORT, CosNConfigKeys.DEFAULT_HTTP_PROXY_PORT);
@@ -200,6 +205,9 @@ private void storeFileWithRetry(String key, InputStream inputStream,
objectMetadata.setContentMD5(Base64.encodeAsString(md5Hash));
}
objectMetadata.setContentLength(length);
+ if (crc32cEnabled) {
+ objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL);
+ }
PutObjectRequest putObjectRequest =
new PutObjectRequest(bucketName, key, inputStream,
@@ -265,6 +273,10 @@ public void storeEmptyFile(String key) throws IOException {
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(0);
+ if (crc32cEnabled) {
+ objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL);
+ }
+
InputStream input = new ByteArrayInputStream(new byte[0]);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName,
key, input, objectMetadata);
@@ -310,12 +322,18 @@ public PartETag uploadPart(
String key, String uploadId, int partNum, long partSize, byte[] md5Hash) throws IOException {
LOG.debug("Upload the part to the cos key [{}]. upload id: {}, part number: {}, part size: {}",
key, uploadId, partNum, partSize);
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ if (crc32cEnabled) {
+ objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL);
+ }
+
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(this.bucketName);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(inputStream);
uploadPartRequest.setPartNumber(partNum);
uploadPartRequest.setPartSize(partSize);
+ uploadPartRequest.setObjectMetadata(objectMetadata);
if (null != md5Hash) {
uploadPartRequest.setMd5Digest(Base64.encodeAsString(md5Hash));
}
@@ -323,7 +341,7 @@ public PartETag uploadPart(
if (this.trafficLimit >= 0) {
uploadPartRequest.setTrafficLimit(this.trafficLimit);
}
- this.setEncryptionMetadata(uploadPartRequest, new ObjectMetadata());
+ this.setEncryptionMetadata(uploadPartRequest, objectMetadata);
try {
UploadPartResult uploadPartResult =
@@ -360,12 +378,18 @@ public String getUploadId(String key) throws IOException {
return "";
}
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ if (crc32cEnabled) {
+ objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL);
+ }
+
InitiateMultipartUploadRequest initiateMultipartUploadRequest =
new InitiateMultipartUploadRequest(bucketName, key);
if (null != this.storageClass) {
initiateMultipartUploadRequest.setStorageClass(this.storageClass);
}
- this.setEncryptionMetadata(initiateMultipartUploadRequest, new ObjectMetadata());
+ initiateMultipartUploadRequest.setObjectMetadata(objectMetadata);
+ this.setEncryptionMetadata(initiateMultipartUploadRequest, objectMetadata);
try {
InitiateMultipartUploadResult initiateMultipartUploadResult =
(InitiateMultipartUploadResult) this.callCOSClientWithRetry(initiateMultipartUploadRequest);
@@ -389,9 +413,14 @@ public int compare(PartETag o1, PartETag o2) {
}
});
try {
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ if (crc32cEnabled) {
+ objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL);
+ }
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, key, uploadId,
partETagList);
+ completeMultipartUploadRequest.setObjectMetadata(objectMetadata);
return (CompleteMultipartUploadResult) this.callCOSClientWithRetry(completeMultipartUploadRequest);
} catch (Exception e) {
String errMsg = String.format("Complete the multipart upload failed. cos key: %s, upload id: %s, " +
@@ -419,6 +448,7 @@ private FileMetadata QueryObjectMetadata(String key) throws IOException {
String ETag = objectMetadata.getETag();
String crc64ecm = objectMetadata.getCrc64Ecma();
+ String crc32cm = (String)objectMetadata.getRawMetadataValue(Constants.CRC32C_RESP_HEADER);
String versionId = objectMetadata.getVersionId();
Map userMetadata = null;
if (objectMetadata.getUserMetadata() != null) {
@@ -443,7 +473,7 @@ private FileMetadata QueryObjectMetadata(String key) throws IOException {
}
FileMetadata fileMetadata =
new FileMetadata(key, fileSize, mtime,
- !key.endsWith(PATH_DELIMITER), ETag, crc64ecm, versionId, objectMetadata.getStorageClass(),
+ !key.endsWith(PATH_DELIMITER), ETag, crc64ecm, crc32cm, versionId, objectMetadata.getStorageClass(),
userMetadata);
LOG.debug("Retrieve the file metadata. cos key: {}, ETag:{}, length:{}, crc64ecm: {}.", key,
objectMetadata.getETag(), objectMetadata.getContentLength(), objectMetadata.getCrc64Ecma());
@@ -573,6 +603,10 @@ private void storeAttribute(String key, String attribute, byte[] value, boolean
objectMetadata.setUserMetadata(userMetadata);
// 构造原地copy请求来设置用户自定义属性
+ if (crc32cEnabled) {
+ objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL);
+ }
+
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, key, bucketName, key);
if (null != objectMetadata.getStorageClass()) {
copyObjectRequest.setStorageClass(objectMetadata.getStorageClass());
@@ -776,10 +810,10 @@ private PartialListing list(String prefix, String delimiter,
long fileLen = cosObjectSummary.getSize();
String fileEtag = cosObjectSummary.getETag();
if (cosObjectSummary.getKey().endsWith(PATH_DELIMITER) && cosObjectSummary.getSize() == 0) {
- fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, false, fileEtag, null, null, cosObjectSummary.getStorageClass()));
+ fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, false, fileEtag, null, null, null, cosObjectSummary.getStorageClass()));
} else {
fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime,
- true, fileEtag, null, null, cosObjectSummary.getStorageClass()));
+ true, fileEtag, null, null, null, cosObjectSummary.getStorageClass()));
}
}
List commonPrefixes = objectListing.getCommonPrefixes();
@@ -826,6 +860,10 @@ public void delete(String key) throws IOException {
public void rename(String srcKey, String dstKey) throws IOException {
LOG.debug("Rename the source cos key [{}] to the dest cos key [{}].", srcKey, dstKey);
try {
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ if (crc32cEnabled) {
+ objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL);
+ }
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucketName, srcKey, bucketName,
dstKey);
@@ -834,7 +872,8 @@ public void rename(String srcKey, String dstKey) throws IOException {
if (null != sourceFileMetadata.getStorageClass()) {
copyObjectRequest.setStorageClass(sourceFileMetadata.getStorageClass());
}
- this.setEncryptionMetadata(copyObjectRequest, new ObjectMetadata());
+ copyObjectRequest.setNewObjectMetadata(objectMetadata);
+ this.setEncryptionMetadata(copyObjectRequest, objectMetadata);
if (null != this.customerDomainEndpointResolver) {
if (null != this.customerDomainEndpointResolver.getEndpoint()) {
@@ -857,13 +896,19 @@ public void rename(String srcKey, String dstKey) throws IOException {
@Override
public void copy(String srcKey, String dstKey) throws IOException {
try {
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ if (crc32cEnabled) {
+ objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL);
+ }
+
CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey);
FileMetadata sourceFileMetadata = this.retrieveMetadata(srcKey);
if (null != sourceFileMetadata.getStorageClass()) {
copyObjectRequest.setStorageClass(sourceFileMetadata.getStorageClass());
}
- this.setEncryptionMetadata(copyObjectRequest, new ObjectMetadata());
+ copyObjectRequest.setNewObjectMetadata(objectMetadata);
+ this.setEncryptionMetadata(copyObjectRequest, objectMetadata);
if (null != this.customerDomainEndpointResolver) {
if (null != this.customerDomainEndpointResolver.getEndpoint()) {
copyObjectRequest.setSourceEndpointBuilder(this.customerDomainEndpointResolver);
diff --git a/src/main/java/org/apache/hadoop/fs/CrcUtil.java b/src/main/java/org/apache/hadoop/fs/CrcUtil.java
new file mode 100644
index 00000000..7065539b
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/fs/CrcUtil.java
@@ -0,0 +1,45 @@
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+public class CrcUtil {
+ private CrcUtil() {
+ }
+
+ /**
+ * @return 4-byte array holding the big-endian representation of
+ * {@code value}.
+ */
+ public static byte[] intToBytes(int value) {
+ byte[] buf = new byte[4];
+ try {
+ writeInt(buf, 0, value);
+ } catch (IOException ioe) {
+ // Since this should only be able to occur from code bugs within this
+ // class rather than user input, we throw as a RuntimeException
+ // rather than requiring this method to declare throwing IOException
+ // for something the caller can't control.
+ throw new RuntimeException(ioe);
+ }
+ return buf;
+ }
+
+ /**
+ * Writes big-endian representation of {@code value} into {@code buf}
+ * starting at {@code offset}. buf.length must be greater than or
+ * equal to offset + 4.
+ */
+ public static void writeInt(byte[] buf, int offset, int value)
+ throws IOException {
+ if (offset + 4 > buf.length) {
+ throw new IOException(String.format(
+ "writeInt out of bounds: buf.length=%d, offset=%d",
+ buf.length, offset));
+ }
+ buf[offset + 0] = (byte) ((value >>> 24) & 0xff);
+ buf[offset + 1] = (byte) ((value >>> 16) & 0xff);
+ buf[offset + 2] = (byte) ((value >>> 8) & 0xff);
+ buf[offset + 3] = (byte) (value & 0xff);
+ }
+
+}
diff --git a/src/main/java/org/apache/hadoop/fs/FileMetadata.java b/src/main/java/org/apache/hadoop/fs/FileMetadata.java
index ae04a936..3f03f0b3 100644
--- a/src/main/java/org/apache/hadoop/fs/FileMetadata.java
+++ b/src/main/java/org/apache/hadoop/fs/FileMetadata.java
@@ -19,6 +19,7 @@ public class FileMetadata {
private final boolean isFile;
private final String ETag;
private final String crc64ecm;
+ private final String crc32cm;
private final String versionId;
private final String storageClass;
private final Map userAttributes;
@@ -33,27 +34,28 @@ public FileMetadata(String key, long length, long lastModified,
}
public FileMetadata(String key, long length, long lastModified, boolean isFile, String ETag) {
- this(key, length, lastModified, isFile, ETag, null, null);
+ this(key, length, lastModified, isFile, ETag, null, null, null);
}
public FileMetadata(String key, long length, long lastModified, boolean isFile, String eTag, String crc64ecm,
- String versionId) {
- this(key, length, lastModified, isFile, eTag, crc64ecm, versionId, null, null);
+ String crc32cm, String versionId) {
+ this(key, length, lastModified, isFile, eTag, crc64ecm, crc32cm, versionId, null, null);
}
public FileMetadata(String key, long length, long lastModified, boolean isFile, String eTag, String crc64ecm,
- String versionId, String storageClass) {
- this(key, length, lastModified, isFile, eTag, crc64ecm, versionId, storageClass, null);
+ String crc32cm, String versionId, String storageClass) {
+ this(key, length, lastModified, isFile, eTag, crc64ecm, crc32cm, versionId, storageClass, null);
}
public FileMetadata(String key, long length, long lastModified, boolean isFile, String eTag, String crc64ecm,
- String versionId, String storageClass, Map userAttributes) {
+ String crc32cm, String versionId, String storageClass, Map userAttributes) {
this.key = key;
this.length = length;
this.lastModified = lastModified;
this.isFile = isFile;
this.ETag = eTag;
this.crc64ecm = crc64ecm;
+ this.crc32cm = crc32cm;
this.versionId = versionId;
this.storageClass = storageClass;
this.userAttributes = userAttributes;
@@ -83,6 +85,10 @@ public String getCrc64ecm() {
return crc64ecm;
}
+ public String getCrc32cm() {
+ return crc32cm;
+ }
+
public String getStorageClass() {
return storageClass;
}