From 84cf837586bdb2568d82214f167b5f8d53e495aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?alantong=28=E4=BD=9F=E6=98=8E=E8=BE=BE=29?= Date: Wed, 14 Oct 2020 20:22:12 +0800 Subject: [PATCH 1/4] crc32c first version, except the complete upload part meta data header --- .../org/apache/hadoop/fs/CRC32CCheckSum.java | 65 +++++++++++++++++++ .../java/org/apache/hadoop/fs/Constants.java | 7 ++ .../org/apache/hadoop/fs/CosFileSystem.java | 8 +++ .../org/apache/hadoop/fs/CosNConfigKeys.java | 4 ++ .../hadoop/fs/CosNativeFileSystemStore.java | 55 ++++++++++++++-- .../org/apache/hadoop/fs/FileMetadata.java | 18 +++-- 6 files changed, 144 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java diff --git a/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java b/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java new file mode 100644 index 00000000..253afcb8 --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java @@ -0,0 +1,65 @@ +package org.apache.hadoop.fs; + +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.StringUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigInteger; + +/** + * An etag as a checksum. + * Consider these suitable for checking if an object has changed, but + * not suitable for comparing two different objects for equivalence, + * especially between hadoop compatible filesystem. + */ +public class CRC32CCheckSum extends FileChecksum { + private static final String ALGORITHM_NAME = "CRC32C"; + + private long crc32c = 0; + + public CRC32CCheckSum() { + } + + public CRC32CCheckSum(String crc32cecma) { + try { + BigInteger bigInteger = new BigInteger(crc32cecma); + this.crc32c = bigInteger.longValue(); + } catch (NumberFormatException e) { + this.crc32c = 0; + } + } + + @Override + public String getAlgorithmName() { + return CRC32CCheckSum.ALGORITHM_NAME; + } + + @Override + public int getLength() { + return Long.SIZE / Byte.SIZE; + } + + @Override + public byte[] getBytes() { + return this.crc32c != 0 ? WritableUtils.toByteArray(this) : new byte[0]; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeLong(this.crc32c); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + this.crc32c = dataInput.readLong(); + } + + @Override + public String toString() { + return "CRC32CChecksum{" + + "crc32c=" + crc32c + + '}'; + } +} diff --git a/src/main/java/org/apache/hadoop/fs/Constants.java b/src/main/java/org/apache/hadoop/fs/Constants.java index 5daafae3..42973579 100644 --- a/src/main/java/org/apache/hadoop/fs/Constants.java +++ b/src/main/java/org/apache/hadoop/fs/Constants.java @@ -10,6 +10,13 @@ private Constants() { // Suffix for local cache file name public static final String BLOCK_TMP_FILE_SUFFIX = "_local_block_cache"; + // Crc32c server response header key + public static final String CRC32C_RESP_HEADER = "x-cos-hash-crc32c"; + // Crc32c agent request header key + public static final String CRC32C_REQ_HEADER = "x-cos-crc32c-flag"; + // Crc32c agent request header value + public static final String CRC32C_REQ_HEADER_VAL = "cosn"; + // Maximum number of blocks uploaded in trunks. public static final int MAX_PART_NUM = 10000; // The maximum size of a single block. diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java index c6cd182c..5ee8c201 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java @@ -820,6 +820,7 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException { Preconditions.checkArgument(length >= 0); LOG.debug("Call the checksum for the path: {}.", f); + // The order of each file, must support both crc at same time, how to tell the difference crc request? if (this.getConf().getBoolean(CosNConfigKeys.CRC64_CHECKSUM_ENABLED, CosNConfigKeys.DEFAULT_CRC64_CHECKSUM_ENABLED)) { Path absolutePath = makeAbsolute(f); @@ -827,6 +828,13 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException { FileMetadata fileMetadata = this.store.retrieveMetadata(key); String crc64ecm = fileMetadata.getCrc64ecm(); return crc64ecm != null ? new CRC64Checksum(crc64ecm) : super.getFileChecksum(f, length); + } else if (this.getConf().getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED, + CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED)) { + Path absolutePath = makeAbsolute(f); + String key = pathToKey(absolutePath); + FileMetadata fileMetadata = this.store.retrieveMetadata(key); + String crc32cm = fileMetadata.getCrc32cm(); + return crc32cm != null ? new CRC32CCheckSum(crc32cm) : super.getFileChecksum(f, length); } else { // disabled return super.getFileChecksum(f, length); diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java index ccadaf7b..e1f55731 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java +++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java @@ -83,8 +83,12 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final int DEFAULT_TRAFFIC_LIMIT = -1; // checksum + // crc64 public static final String CRC64_CHECKSUM_ENABLED = "fs.cosn.crc64.checksum.enabled"; public static final boolean DEFAULT_CRC64_CHECKSUM_ENABLED = false; + // crc32c + public static final String CRC32C_CHECKSUM_ENABLED = "fs.cosn.crc32c.checksum.enabled"; + public static final boolean DEFAULT_CRC32C_CHECKSUM_ENABLED = false; public static final String HTTP_PROXY_IP = "fs.cosn.http.proxy.ip"; public static final String HTTP_PROXY_PORT = "fs.cosn.http.proxy.port"; diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index c7d4449d..5597bf56 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -11,6 +11,7 @@ import com.qcloud.cos.utils.Jackson; import com.qcloud.cos.utils.StringUtils; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.math3.analysis.function.Constant; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -41,6 +42,7 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore { private StorageClass storageClass; private int maxRetryTimes; private int trafficLimit; + private boolean openCrc32c; private CosEncryptionSecrets encryptionSecrets; private CustomerDomainEndpointResolver customerDomainEndpointResolver; @@ -90,6 +92,9 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { config.setHttpProtocol(HttpProtocol.https); } + this.openCrc32c = conf.getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED, + CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED); + // Proxy settings String httpProxyIp = conf.getTrimmed(CosNConfigKeys.HTTP_PROXY_IP); int httpProxyPort = conf.getInt(CosNConfigKeys.HTTP_PROXY_PORT, CosNConfigKeys.DEFAULT_HTTP_PROXY_PORT); @@ -200,6 +205,9 @@ private void storeFileWithRetry(String key, InputStream inputStream, objectMetadata.setContentMD5(Base64.encodeAsString(md5Hash)); } objectMetadata.setContentLength(length); + if (openCrc32c) { + objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); + } PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, inputStream, @@ -265,6 +273,10 @@ public void storeEmptyFile(String key) throws IOException { ObjectMetadata objectMetadata = new ObjectMetadata(); objectMetadata.setContentLength(0); + if (openCrc32c) { + objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); + } + InputStream input = new ByteArrayInputStream(new byte[0]); PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, input, objectMetadata); @@ -310,12 +322,18 @@ public PartETag uploadPart( String key, String uploadId, int partNum, long partSize, byte[] md5Hash) throws IOException { LOG.debug("Upload the part to the cos key [{}]. upload id: {}, part number: {}, part size: {}", key, uploadId, partNum, partSize); + ObjectMetadata objectMetadata = new ObjectMetadata(); + if (openCrc32c) { + objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); + } + UploadPartRequest uploadPartRequest = new UploadPartRequest(); uploadPartRequest.setBucketName(this.bucketName); uploadPartRequest.setUploadId(uploadId); uploadPartRequest.setInputStream(inputStream); uploadPartRequest.setPartNumber(partNum); uploadPartRequest.setPartSize(partSize); + uploadPartRequest.setObjectMetadata(objectMetadata); if (null != md5Hash) { uploadPartRequest.setMd5Digest(Base64.encodeAsString(md5Hash)); } @@ -323,7 +341,7 @@ public PartETag uploadPart( if (this.trafficLimit >= 0) { uploadPartRequest.setTrafficLimit(this.trafficLimit); } - this.setEncryptionMetadata(uploadPartRequest, new ObjectMetadata()); + this.setEncryptionMetadata(uploadPartRequest, objectMetadata); try { UploadPartResult uploadPartResult = @@ -360,12 +378,18 @@ public String getUploadId(String key) throws IOException { return ""; } + ObjectMetadata objectMetadata = new ObjectMetadata(); + if (openCrc32c) { + objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); + } + InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(bucketName, key); if (null != this.storageClass) { initiateMultipartUploadRequest.setStorageClass(this.storageClass); } - this.setEncryptionMetadata(initiateMultipartUploadRequest, new ObjectMetadata()); + initiateMultipartUploadRequest.setObjectMetadata(objectMetadata); + this.setEncryptionMetadata(initiateMultipartUploadRequest, objectMetadata); try { InitiateMultipartUploadResult initiateMultipartUploadResult = (InitiateMultipartUploadResult) this.callCOSClientWithRetry(initiateMultipartUploadRequest); @@ -389,6 +413,7 @@ public int compare(PartETag o1, PartETag o2) { } }); try { + // TODO TMD complete multi part java sdk how to add the meta header? CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETagList); @@ -419,6 +444,7 @@ private FileMetadata QueryObjectMetadata(String key) throws IOException { String ETag = objectMetadata.getETag(); String crc64ecm = objectMetadata.getCrc64Ecma(); + String crc32cm = (String)objectMetadata.getRawMetadataValue(Constants.CRC32C_RESP_HEADER); String versionId = objectMetadata.getVersionId(); Map userMetadata = null; if (objectMetadata.getUserMetadata() != null) { @@ -443,7 +469,7 @@ private FileMetadata QueryObjectMetadata(String key) throws IOException { } FileMetadata fileMetadata = new FileMetadata(key, fileSize, mtime, - !key.endsWith(PATH_DELIMITER), ETag, crc64ecm, versionId, objectMetadata.getStorageClass(), + !key.endsWith(PATH_DELIMITER), ETag, crc64ecm, crc32cm, versionId, objectMetadata.getStorageClass(), userMetadata); LOG.debug("Retrieve the file metadata. cos key: {}, ETag:{}, length:{}, crc64ecm: {}.", key, objectMetadata.getETag(), objectMetadata.getContentLength(), objectMetadata.getCrc64Ecma()); @@ -573,6 +599,10 @@ private void storeAttribute(String key, String attribute, byte[] value, boolean objectMetadata.setUserMetadata(userMetadata); // 构造原地copy请求来设置用户自定义属性 + if (openCrc32c) { + objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); + } + CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, key, bucketName, key); if (null != objectMetadata.getStorageClass()) { copyObjectRequest.setStorageClass(objectMetadata.getStorageClass()); @@ -776,10 +806,10 @@ private PartialListing list(String prefix, String delimiter, long fileLen = cosObjectSummary.getSize(); String fileEtag = cosObjectSummary.getETag(); if (cosObjectSummary.getKey().endsWith(PATH_DELIMITER) && cosObjectSummary.getSize() == 0) { - fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, false, fileEtag, null, null, cosObjectSummary.getStorageClass())); + fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, false, fileEtag, null, null, null, cosObjectSummary.getStorageClass())); } else { fileMetadataArray.add(new FileMetadata(filePath, fileLen, mtime, - true, fileEtag, null, null, cosObjectSummary.getStorageClass())); + true, fileEtag, null, null, null, cosObjectSummary.getStorageClass())); } } List commonPrefixes = objectListing.getCommonPrefixes(); @@ -826,6 +856,10 @@ public void delete(String key) throws IOException { public void rename(String srcKey, String dstKey) throws IOException { LOG.debug("Rename the source cos key [{}] to the dest cos key [{}].", srcKey, dstKey); try { + ObjectMetadata objectMetadata = new ObjectMetadata(); + if (openCrc32c) { + objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); + } CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey); @@ -834,7 +868,8 @@ public void rename(String srcKey, String dstKey) throws IOException { if (null != sourceFileMetadata.getStorageClass()) { copyObjectRequest.setStorageClass(sourceFileMetadata.getStorageClass()); } - this.setEncryptionMetadata(copyObjectRequest, new ObjectMetadata()); + copyObjectRequest.setNewObjectMetadata(objectMetadata); + this.setEncryptionMetadata(copyObjectRequest, objectMetadata); if (null != this.customerDomainEndpointResolver) { if (null != this.customerDomainEndpointResolver.getEndpoint()) { @@ -857,13 +892,19 @@ public void rename(String srcKey, String dstKey) throws IOException { @Override public void copy(String srcKey, String dstKey) throws IOException { try { + ObjectMetadata objectMetadata = new ObjectMetadata(); + if (openCrc32c) { + objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); + } + CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, srcKey, bucketName, dstKey); FileMetadata sourceFileMetadata = this.retrieveMetadata(srcKey); if (null != sourceFileMetadata.getStorageClass()) { copyObjectRequest.setStorageClass(sourceFileMetadata.getStorageClass()); } - this.setEncryptionMetadata(copyObjectRequest, new ObjectMetadata()); + copyObjectRequest.setNewObjectMetadata(objectMetadata); + this.setEncryptionMetadata(copyObjectRequest, objectMetadata); if (null != this.customerDomainEndpointResolver) { if (null != this.customerDomainEndpointResolver.getEndpoint()) { copyObjectRequest.setSourceEndpointBuilder(this.customerDomainEndpointResolver); diff --git a/src/main/java/org/apache/hadoop/fs/FileMetadata.java b/src/main/java/org/apache/hadoop/fs/FileMetadata.java index ae04a936..3f03f0b3 100644 --- a/src/main/java/org/apache/hadoop/fs/FileMetadata.java +++ b/src/main/java/org/apache/hadoop/fs/FileMetadata.java @@ -19,6 +19,7 @@ public class FileMetadata { private final boolean isFile; private final String ETag; private final String crc64ecm; + private final String crc32cm; private final String versionId; private final String storageClass; private final Map userAttributes; @@ -33,27 +34,28 @@ public FileMetadata(String key, long length, long lastModified, } public FileMetadata(String key, long length, long lastModified, boolean isFile, String ETag) { - this(key, length, lastModified, isFile, ETag, null, null); + this(key, length, lastModified, isFile, ETag, null, null, null); } public FileMetadata(String key, long length, long lastModified, boolean isFile, String eTag, String crc64ecm, - String versionId) { - this(key, length, lastModified, isFile, eTag, crc64ecm, versionId, null, null); + String crc32cm, String versionId) { + this(key, length, lastModified, isFile, eTag, crc64ecm, crc32cm, versionId, null, null); } public FileMetadata(String key, long length, long lastModified, boolean isFile, String eTag, String crc64ecm, - String versionId, String storageClass) { - this(key, length, lastModified, isFile, eTag, crc64ecm, versionId, storageClass, null); + String crc32cm, String versionId, String storageClass) { + this(key, length, lastModified, isFile, eTag, crc64ecm, crc32cm, versionId, storageClass, null); } public FileMetadata(String key, long length, long lastModified, boolean isFile, String eTag, String crc64ecm, - String versionId, String storageClass, Map userAttributes) { + String crc32cm, String versionId, String storageClass, Map userAttributes) { this.key = key; this.length = length; this.lastModified = lastModified; this.isFile = isFile; this.ETag = eTag; this.crc64ecm = crc64ecm; + this.crc32cm = crc32cm; this.versionId = versionId; this.storageClass = storageClass; this.userAttributes = userAttributes; @@ -83,6 +85,10 @@ public String getCrc64ecm() { return crc64ecm; } + public String getCrc32cm() { + return crc32cm; + } + public String getStorageClass() { return storageClass; } From 75aab5a88bda4b94d0a1647c9569824e1fc0ee49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?alantong=28=E4=BD=9F=E6=98=8E=E8=BE=BE=29?= Date: Fri, 16 Oct 2020 15:48:42 +0800 Subject: [PATCH 2/4] add the crc32c header to complete interface, fix other bugs --- pom.xml | 2 +- .../org/apache/hadoop/fs/CRC32CCheckSum.java | 21 +++++---- .../hadoop/fs/CosNativeFileSystemStore.java | 6 ++- .../java/org/apache/hadoop/fs/CrcUtil.java | 45 +++++++++++++++++++ 4 files changed, 61 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/fs/CrcUtil.java diff --git a/pom.xml b/pom.xml index baf79fa8..df127968 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,7 @@ 1.8 1.8 3.1.0 - 5.6.29 + 5.6.31 24.1.1-jre 3.1 4.8 diff --git a/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java b/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java index 253afcb8..2dc49598 100644 --- a/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java +++ b/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java @@ -15,17 +15,18 @@ * especially between hadoop compatible filesystem. */ public class CRC32CCheckSum extends FileChecksum { - private static final String ALGORITHM_NAME = "CRC32C"; + private static final String ALGORITHM_NAME = "COMPOSITE-CRC32C"; - private long crc32c = 0; + private int crc32c = 0; public CRC32CCheckSum() { } + public CRC32CCheckSum(String crc32cecma) { try { - BigInteger bigInteger = new BigInteger(crc32cecma); - this.crc32c = bigInteger.longValue(); + Integer integer = new Integer(crc32cecma); + this.crc32c = integer.intValue(); } catch (NumberFormatException e) { this.crc32c = 0; } @@ -38,28 +39,26 @@ public String getAlgorithmName() { @Override public int getLength() { - return Long.SIZE / Byte.SIZE; + return Integer.SIZE / Byte.SIZE; } @Override public byte[] getBytes() { - return this.crc32c != 0 ? WritableUtils.toByteArray(this) : new byte[0]; + return CrcUtil.intToBytes(crc32c); } @Override public void write(DataOutput dataOutput) throws IOException { - dataOutput.writeLong(this.crc32c); + dataOutput.writeInt(this.crc32c); } @Override public void readFields(DataInput dataInput) throws IOException { - this.crc32c = dataInput.readLong(); + this.crc32c = dataInput.readInt(); } @Override public String toString() { - return "CRC32CChecksum{" + - "crc32c=" + crc32c + - '}'; + return getAlgorithmName() + ":" + String.format("0x%08x", crc32c); } } diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index 5597bf56..edf3a8c7 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -413,10 +413,14 @@ public int compare(PartETag o1, PartETag o2) { } }); try { - // TODO TMD complete multi part java sdk how to add the meta header? + ObjectMetadata objectMetadata = new ObjectMetadata(); + if (openCrc32c) { + objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); + } CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(bucketName, key, uploadId, partETagList); + completeMultipartUploadRequest.setObjectMetadata(objectMetadata); return (CompleteMultipartUploadResult) this.callCOSClientWithRetry(completeMultipartUploadRequest); } catch (Exception e) { String errMsg = String.format("Complete the multipart upload failed. cos key: %s, upload id: %s, " + diff --git a/src/main/java/org/apache/hadoop/fs/CrcUtil.java b/src/main/java/org/apache/hadoop/fs/CrcUtil.java new file mode 100644 index 00000000..7065539b --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/CrcUtil.java @@ -0,0 +1,45 @@ +package org.apache.hadoop.fs; + +import java.io.IOException; + +public class CrcUtil { + private CrcUtil() { + } + + /** + * @return 4-byte array holding the big-endian representation of + * {@code value}. + */ + public static byte[] intToBytes(int value) { + byte[] buf = new byte[4]; + try { + writeInt(buf, 0, value); + } catch (IOException ioe) { + // Since this should only be able to occur from code bugs within this + // class rather than user input, we throw as a RuntimeException + // rather than requiring this method to declare throwing IOException + // for something the caller can't control. + throw new RuntimeException(ioe); + } + return buf; + } + + /** + * Writes big-endian representation of {@code value} into {@code buf} + * starting at {@code offset}. buf.length must be greater than or + * equal to offset + 4. + */ + public static void writeInt(byte[] buf, int offset, int value) + throws IOException { + if (offset + 4 > buf.length) { + throw new IOException(String.format( + "writeInt out of bounds: buf.length=%d, offset=%d", + buf.length, offset)); + } + buf[offset + 0] = (byte) ((value >>> 24) & 0xff); + buf[offset + 1] = (byte) ((value >>> 16) & 0xff); + buf[offset + 2] = (byte) ((value >>> 8) & 0xff); + buf[offset + 3] = (byte) (value & 0xff); + } + +} From 40baa9a703f3e080aec213c6b62067c334066fdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?alantong=28=E4=BD=9F=E6=98=8E=E8=BE=BE=29?= Date: Mon, 2 Nov 2020 11:42:21 +0800 Subject: [PATCH 3/4] fix the int less bit counting issue, throw the exception when file not exist --- src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java | 4 ++-- src/main/java/org/apache/hadoop/fs/CosFileSystem.java | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java b/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java index 2dc49598..7dd53008 100644 --- a/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java +++ b/src/main/java/org/apache/hadoop/fs/CRC32CCheckSum.java @@ -25,8 +25,8 @@ public CRC32CCheckSum() { public CRC32CCheckSum(String crc32cecma) { try { - Integer integer = new Integer(crc32cecma); - this.crc32c = integer.intValue(); + BigInteger bigInteger = new BigInteger(crc32cecma); + this.crc32c = bigInteger.intValue(); } catch (NumberFormatException e) { this.crc32c = 0; } diff --git a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java index 5ee8c201..3eabc88c 100644 --- a/src/main/java/org/apache/hadoop/fs/CosFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosFileSystem.java @@ -826,6 +826,9 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException { Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); FileMetadata fileMetadata = this.store.retrieveMetadata(key); + if (null == fileMetadata) { + throw new FileNotFoundException("File or directory doesn't exist: " + f); + } String crc64ecm = fileMetadata.getCrc64ecm(); return crc64ecm != null ? new CRC64Checksum(crc64ecm) : super.getFileChecksum(f, length); } else if (this.getConf().getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED, @@ -833,6 +836,9 @@ public FileChecksum getFileChecksum(Path f, long length) throws IOException { Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); FileMetadata fileMetadata = this.store.retrieveMetadata(key); + if (null == fileMetadata) { + throw new FileNotFoundException("File or directory doesn't exist: " + f); + } String crc32cm = fileMetadata.getCrc32cm(); return crc32cm != null ? new CRC32CCheckSum(crc32cm) : super.getFileChecksum(f, length); } else { From 966f447eb00ec77cb43649022e55d3875bda97c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?alantong=28=E4=BD=9F=E6=98=8E=E8=BE=BE=29?= Date: Fri, 6 Nov 2020 16:05:37 +0800 Subject: [PATCH 4/4] change the name of flag --- .../hadoop/fs/CosNativeFileSystemStore.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java index edf3a8c7..923f2668 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java +++ b/src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java @@ -42,7 +42,7 @@ public class CosNativeFileSystemStore implements NativeFileSystemStore { private StorageClass storageClass; private int maxRetryTimes; private int trafficLimit; - private boolean openCrc32c; + private boolean crc32cEnabled; private CosEncryptionSecrets encryptionSecrets; private CustomerDomainEndpointResolver customerDomainEndpointResolver; @@ -92,7 +92,7 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException { config.setHttpProtocol(HttpProtocol.https); } - this.openCrc32c = conf.getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED, + this.crc32cEnabled = conf.getBoolean(CosNConfigKeys.CRC32C_CHECKSUM_ENABLED, CosNConfigKeys.DEFAULT_CRC32C_CHECKSUM_ENABLED); // Proxy settings @@ -205,7 +205,7 @@ private void storeFileWithRetry(String key, InputStream inputStream, objectMetadata.setContentMD5(Base64.encodeAsString(md5Hash)); } objectMetadata.setContentLength(length); - if (openCrc32c) { + if (crc32cEnabled) { objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); } @@ -273,7 +273,7 @@ public void storeEmptyFile(String key) throws IOException { ObjectMetadata objectMetadata = new ObjectMetadata(); objectMetadata.setContentLength(0); - if (openCrc32c) { + if (crc32cEnabled) { objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); } @@ -323,7 +323,7 @@ public PartETag uploadPart( LOG.debug("Upload the part to the cos key [{}]. upload id: {}, part number: {}, part size: {}", key, uploadId, partNum, partSize); ObjectMetadata objectMetadata = new ObjectMetadata(); - if (openCrc32c) { + if (crc32cEnabled) { objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); } @@ -379,7 +379,7 @@ public String getUploadId(String key) throws IOException { } ObjectMetadata objectMetadata = new ObjectMetadata(); - if (openCrc32c) { + if (crc32cEnabled) { objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); } @@ -414,7 +414,7 @@ public int compare(PartETag o1, PartETag o2) { }); try { ObjectMetadata objectMetadata = new ObjectMetadata(); - if (openCrc32c) { + if (crc32cEnabled) { objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); } CompleteMultipartUploadRequest completeMultipartUploadRequest = @@ -603,7 +603,7 @@ private void storeAttribute(String key, String attribute, byte[] value, boolean objectMetadata.setUserMetadata(userMetadata); // 构造原地copy请求来设置用户自定义属性 - if (openCrc32c) { + if (crc32cEnabled) { objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); } @@ -861,7 +861,7 @@ public void rename(String srcKey, String dstKey) throws IOException { LOG.debug("Rename the source cos key [{}] to the dest cos key [{}].", srcKey, dstKey); try { ObjectMetadata objectMetadata = new ObjectMetadata(); - if (openCrc32c) { + if (crc32cEnabled) { objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); } CopyObjectRequest copyObjectRequest = @@ -897,7 +897,7 @@ public void rename(String srcKey, String dstKey) throws IOException { public void copy(String srcKey, String dstKey) throws IOException { try { ObjectMetadata objectMetadata = new ObjectMetadata(); - if (openCrc32c) { + if (crc32cEnabled) { objectMetadata.setHeader(Constants.CRC32C_REQ_HEADER, Constants.CRC32C_REQ_HEADER_VAL); }