diff --git a/dep/cos_migrate_tool-1.4.13-jar-with-dependencies.jar b/dep/cos_migrate_tool-1.4.13.1-jar-with-dependencies.jar similarity index 51% rename from dep/cos_migrate_tool-1.4.13-jar-with-dependencies.jar rename to dep/cos_migrate_tool-1.4.13.1-jar-with-dependencies.jar index db68b47..c392e03 100644 Binary files a/dep/cos_migrate_tool-1.4.13-jar-with-dependencies.jar and b/dep/cos_migrate_tool-1.4.13.1-jar-with-dependencies.jar differ diff --git a/pom.xml b/pom.xml index 96019b5..167dc1f 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.qcloud cos_migrate_tool - 1.4.13 + 1.4.13.1 jar UTF-8 @@ -96,6 +96,31 @@ + + + com.qcloud.cos + hadoop-cos + 8.2.7 + + + + com.qcloud + chdfs_hadoop_plugin_network + 2.8 + + + + com.qcloud + cos_api-bundle + 5.6.112 + + + + org.apache.hadoop + hadoop-common + 2.8.5 + + diff --git a/src/main/java/com/qcloud/cos_migrate_tool/app/App.java b/src/main/java/com/qcloud/cos_migrate_tool/app/App.java index 3e7242b..38638bd 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/app/App.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/app/App.java @@ -7,18 +7,7 @@ import java.io.InputStreamReader; import com.qcloud.cos.internal.SkipMd5CheckStrategy; -import com.qcloud.cos_migrate_tool.config.CommonConfig; -import com.qcloud.cos_migrate_tool.config.ConfigParser; -import com.qcloud.cos_migrate_tool.config.CopyBucketConfig; -import com.qcloud.cos_migrate_tool.config.CopyFromAliConfig; -import com.qcloud.cos_migrate_tool.config.CopyFromAwsConfig; -import com.qcloud.cos_migrate_tool.config.CopyFromCompetitorConfig; -import com.qcloud.cos_migrate_tool.config.CopyFromCspConfig; -import com.qcloud.cos_migrate_tool.config.CopyFromLocalConfig; -import com.qcloud.cos_migrate_tool.config.CopyFromQiniuConfig; -import com.qcloud.cos_migrate_tool.config.CopyFromUpyunConfig; -import com.qcloud.cos_migrate_tool.config.CopyFromUrllistConfig; -import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.config.*; import com.qcloud.cos_migrate_tool.meta.TaskStatics; import com.qcloud.cos_migrate_tool.task.MigrateAliTaskExecutor; import com.qcloud.cos_migrate_tool.task.MigrateAwsTaskExecutor; @@ -31,6 +20,7 @@ import com.qcloud.cos_migrate_tool.task.MigrateUrllistTaskExecutor; import com.qcloud.cos_migrate_tool.task.TaskExecutor; +import com.qcloud.cos_migrate_tool.task_by_hadoop_fs.MigrateLocalToCosnTaskExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +47,8 @@ private static TaskExecutor buildTaskExecutor(CommonConfig config) { return new MigrateCspTaskExecutor((CopyFromCspConfig) config); } else if (ConfigParser.instance.getMigrateType().equals(MigrateType.MIGRATE_FROM_UPYUN)) { return new MigrateUpyunTaskExecutor((CopyFromUpyunConfig) config); + } else if (ConfigParser.instance.getMigrateType().equals(MigrateType.MIGRATE_FROM_LOCAL_TO_COSN_FS)) { + return new MigrateLocalToCosnTaskExecutor((CopyFromLocalToCosnConfig) config); } else { System.out.println("unknown migrate type"); } diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java b/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java index b476be1..2deb06c 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/ConfigParser.java @@ -76,6 +76,8 @@ public class ConfigParser { private static final String FILE_LIST_PATH = "fileListPath"; private static final String CHECK_LOCAL_RECORD = "checkLocalRecord"; + private static final String LOCAL_TO_COSN_FS_SECTION_NAME = "migrateLocalToCosnFs"; + private static final String ALI_SECTION_NAME = "migrateAli"; private static final String AWS_SECTION_NAME = "migrateAws"; private static final String QINIU_SECTION_NAME = "migrateQiniu"; @@ -303,6 +305,14 @@ public boolean parse() { if (!initCopyFromUpyunConfig(prefs, (CopyFromUpyunConfig) config)) { return false; } + } else if (migrateType.equals(MigrateType.MIGRATE_FROM_LOCAL_TO_COSN_FS)){ + if (!checkMigrateLocalToCosnFsConfig(prefs)) { + return false; + } + config = new CopyFromLocalToCosnConfig(); + if (!initCopyFromLocalToCosnConfig(prefs, (CopyFromLocalToCosnConfig) config)) { + return false; + } } @@ -402,6 +412,13 @@ private boolean checkMigrateLocalConfig(Preferences prefs) { return true; } + private boolean checkMigrateLocalToCosnFsConfig(Preferences prefs) { + if (!isKeyExist(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, LOCAL_LOCALPATH)) { + return false; + } + return true; + } + private boolean checkMigrateCopyBucketConfig(Preferences prefs) { if (!isKeyExist(prefs, COPY_BUCKET_SECTION_NAME, COPY_SRC_REGION)) { return false; @@ -715,6 +732,69 @@ private boolean initCopyFromLocalConfig(Preferences prefs, return true; } + private boolean initCopyFromLocalToCosnConfig(Preferences prefs, + CopyFromLocalToCosnConfig copyLocalConfig) { + if (!initCommonConfig(prefs, copyLocalConfig)) { + return false; + } + try { + + String localPathConfig = getConfigValue(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, LOCAL_LOCALPATH); + assert (localPathConfig != null); + copyLocalConfig.setLocalPath(localPathConfig); + + String excludes = getConfigValue(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, LOCAL_EXECLUDE); + if (excludes != null && !excludes.trim().isEmpty()) { + copyLocalConfig.setExcludes(excludes); + } else { + excludes = getConfigValue(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, "excludes"); + if (excludes != null && !excludes.trim().isEmpty()) { + copyLocalConfig.setExcludes(excludes); + } + } + + String ignoreModifiedTimeLessThanStr = + getConfigValue(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, IGNORE_MODIFIED_TIME_LESS_THAN); + if (ignoreModifiedTimeLessThanStr != null + && !ignoreModifiedTimeLessThanStr.trim().isEmpty()) { + copyLocalConfig.setIgnoreModifiedTimeLessThan(ignoreModifiedTimeLessThanStr); + } + + String ignoreSuffix = getConfigValue(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, IGNORE_SUFFIX); + if (ignoreSuffix != null && !ignoreSuffix.trim().isEmpty()) { + copyLocalConfig.setIgnoreSuffix(ignoreSuffix); + } + + String includeSuffix = getConfigValue(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, INCLUDE_SUFFIX); + if (includeSuffix != null && !includeSuffix.trim().isEmpty()) { + copyLocalConfig.setIncludeSuffix(includeSuffix); + } + + String ignoreEmptyFile = getConfigValue(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, IGNORE_EMPTY_FILE); + if (ignoreEmptyFile != null && (ignoreEmptyFile.compareToIgnoreCase("on") == 0)) { + copyLocalConfig.setIgnoreEmptyFile(true); + } + String fileListMode = getConfigValue(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, FILE_LIST_MODE); + if (fileListMode != null && (fileListMode.compareToIgnoreCase("on") == 0)) { + copyLocalConfig.setFileListMode(true); + } + String fileListPath = getConfigValue(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, FILE_LIST_PATH); + if (fileListPath != null) { + copyLocalConfig.setFileListPath(fileListPath); + } + + String strCheckLocal = getConfigValue(prefs, LOCAL_TO_COSN_FS_SECTION_NAME, CHECK_LOCAL_RECORD); + if (strCheckLocal != null) { + copyLocalConfig.setCheckLocalRecord(strCheckLocal); + } + } catch (Exception e) { + System.err.println(e.getMessage()); + log.error(e.getMessage()); + return false; + } + return true; + } + private boolean initCopyFromUrllistConfig(Preferences prefs, CopyFromUrllistConfig copyUrllistConfig) { if (!initCommonConfig(prefs, copyUrllistConfig)) { diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromLocalConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromLocalConfig.java index 32f8aff..ed1f5b9 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromLocalConfig.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromLocalConfig.java @@ -112,6 +112,7 @@ public void setIgnoreModifiedTimeLessThan(String ignoreModifiedTimeLessThanStr) public long getIgnoreModifiedTimeLessThan() { return ignoreModifiedTimeLessThan; } + public boolean isFileListMode() { return fileListMode; } diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromLocalToCosnConfig.java b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromLocalToCosnConfig.java new file mode 100644 index 0000000..1857870 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/CopyFromLocalToCosnConfig.java @@ -0,0 +1,189 @@ +package com.qcloud.cos_migrate_tool.config; + +import com.qcloud.cos_migrate_tool.utils.SystemUtils; + +import java.io.File; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.Set; + +public class CopyFromLocalToCosnConfig extends CommonConfig{ + private String localPath; + private Set excludes = new HashSet(); + private long ignoreModifiedTimeLessThan = -1; + private Set ignoreSuffixs = new HashSet(); + private Set includeSuffixs = new HashSet(); + boolean ignoreEmptyFile = false; + boolean fileListMode = false; + private String fileListPath; + private boolean isCheckLocalRecord = true; + + public void setIgnoreEmptyFile(boolean ignoreEmptyFile) { + this.ignoreEmptyFile = ignoreEmptyFile; + } + + public void setIgnoreSuffix(String ignore) { + ignore = ignore.trim(); + String[] ignoreArray = ignore.split(";"); + for (String ignoreElement : ignoreArray) { + this.ignoreSuffixs.add(ignoreElement); + } + } + + public void setIncludeSuffix(String include) { + include = include.trim(); + String[] includeArray = include.split(";"); + for (String includeElement : includeArray) { + this.includeSuffixs.add(includeElement); + } + } + + public String needToMigrate(Path file, String localPath) { + if (isExcludes(localPath)) { + return "excludes"; + } + + for (String suffix:ignoreSuffixs) { + if(localPath.endsWith(suffix)) { + return "suffix"; + } + } + + if (ignoreEmptyFile) { + File localFile = new File(file.toString()); + if (localFile.length() == 0) { + return "empty file"; + } + } + + if (includeSuffixs.isEmpty()) { + return ""; + } + + for (String suffix:includeSuffixs) { + if (localPath.endsWith(suffix)) { + return ""; + } + } + + return "do not match include suffix"; + } + + public String getLocalPath() { + return localPath; + } + + public void setLocalPath(String localPath) throws IllegalArgumentException { + File localPathFile = new File(localPath); + if (!localPathFile.exists()) { + throw new IllegalArgumentException("local path not exist!"); + } + this.localPath = SystemUtils.formatLocalPath(localPath); + } + + public void setExcludes(String excludePath) throws IllegalArgumentException { + excludePath = excludePath.trim(); + String[] exludePathArray = excludePath.split(";"); + for (String excludePathElement : exludePathArray) { + File tempFile = new File(excludePathElement); + if (!tempFile.exists()) { + throw new IllegalArgumentException("excludePath " + excludePath + " not exist"); + } + this.excludes.add(SystemUtils.formatLocalPath(tempFile.getAbsolutePath())); + } + } + + public boolean isExcludes(String excludePath) { + return this.excludes.contains(excludePath); + } + + public void setIgnoreModifiedTimeLessThan(String ignoreModifiedTimeLessThanStr) { + try { + long number = Long.valueOf(ignoreModifiedTimeLessThanStr); + if (number <= 0) { + throw new IllegalArgumentException(ignoreModifiedTimeLessThanStr + " is invalid, ignoreModifiedTimeLessThan must be positive"); + } + this.ignoreModifiedTimeLessThan = number; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid ignoreModifiedTimeLessThan"); + } + } + + public long getIgnoreModifiedTimeLessThan() { + return ignoreModifiedTimeLessThan; + } + + public boolean isFileListMode() { + return fileListMode; + } + + public void setFileListMode(boolean fileListMode) { + this.fileListMode = fileListMode; + } + + public String getFileListPath() { + return fileListPath; + } + + public void setFileListPath(String fileListPath) { + this.fileListPath = fileListPath; + } + + public void setCheckLocalRecord(String checkLocalRecord) { + if (checkLocalRecord.compareToIgnoreCase("true") == 0) { + isCheckLocalRecord = true; + } else if (checkLocalRecord.compareToIgnoreCase("false") == 0) { + isCheckLocalRecord = false; + } else { + throw new IllegalArgumentException(checkLocalRecord + " is invalid, checkLocalRecord should be true or false"); + } + } + + public boolean checkLocalRecord() { + return isCheckLocalRecord; + } + + + public String toString() { + String strExclude = ""; + if (!excludes.isEmpty()) { + StringBuilder sb = new StringBuilder(); + for (String element : excludes) { + sb.append(element).append(","); + } + strExclude = sb.toString(); + } + + + String strIgnoreSuffix = ""; + if (!ignoreSuffixs.isEmpty()) { + StringBuilder sb = new StringBuilder(); + for (String element : ignoreSuffixs) { + sb.append(element).append(","); + } + strIgnoreSuffix = sb.toString(); + } + + String strIncludeSuffix = ""; + if (!includeSuffixs.isEmpty()) { + StringBuilder sb = new StringBuilder(); + for (String element : includeSuffixs) { + sb.append(element).append(","); + } + strIncludeSuffix = sb.toString(); + } + + return super.toString() + + ". CopyFromLocalConfig:" + + "localPath=" + getLocalPath() + + ",excludes=" + strExclude + + ",ignoreModifiedTimeLessThanSeconds=" + getIgnoreModifiedTimeLessThan() + + ",ignoreSuffix=" + strIgnoreSuffix + + ",ignoreEmptyFile=" + ignoreEmptyFile + + ",includeSuffix=" + strIncludeSuffix + + ",fileListMode=" + isFileListMode() + + ",fileListPath=" + getFileListPath() + + ",checkLocalRecord=" + checkLocalRecord() + ; + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/config/MigrateType.java b/src/main/java/com/qcloud/cos_migrate_tool/config/MigrateType.java index f2c800c..f429371 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/config/MigrateType.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/config/MigrateType.java @@ -8,7 +8,8 @@ public enum MigrateType { MIGRATE_FROM_URLLIST("migrateUrl"), MIGRATE_FROM_COS_BUCKET_COPY("migrateBucketCopy"), MIGRATE_FROM_CSP("migrateCsp"), - MIGRATE_FROM_UPYUN("migrateUpyun"); + MIGRATE_FROM_UPYUN("migrateUpyun"), + MIGRATE_FROM_LOCAL_TO_COSN_FS("migrateLocalToCosnFs"); private String migrateType; diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java index 7b9c94f..52123bf 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTask.java @@ -33,8 +33,8 @@ public class MigrateLocalTask extends Task { public MigrateLocalTask(Semaphore semaphore, CopyFromLocalConfig copyFromLocalConfig, - TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, - File localFile) { + TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, + File localFile) { super(semaphore, copyFromLocalConfig, smallFileTransfer, bigFileTransfer, recordDb); this.bucketName = copyFromLocalConfig.getBucketName(); this.localFolder = copyFromLocalConfig.getLocalPath(); @@ -108,9 +108,9 @@ public void doTask() { } else { TaskStatics.instance.addUpdateCnt(); } - + if(!config.getOutputFinishedFilePath().isEmpty()) { - SimpleDateFormat dateFormat= new SimpleDateFormat("YYYY-MM-dd");//设置当前时间的格式,为年-月-日 + SimpleDateFormat dateFormat= new SimpleDateFormat("YYYY-MM-dd");//设置当前时间的格式,为年-月-日 String file_name = dateFormat.format(new Date()) + ".out"; String resultFile = config.getOutputFinishedFilePath() + file_name; try { @@ -126,8 +126,8 @@ public void doTask() { log.error("write result fail,result \n" + e.toString()); } } - - + + String printMsg = String.format("[ok] [requestid: %s], task_info: %s", requestId == null ? "NULL" : requestId, migrateLocalRecordElement.buildKey()); System.out.println(printMsg); @@ -142,4 +142,4 @@ public void doTask() { TaskStatics.instance.addFailCnt(); } } -} +} \ No newline at end of file diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java index 08a1d65..6ad6fd2 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/MigrateLocalTaskExecutor.java @@ -172,7 +172,7 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) String reason = ((CopyFromLocalConfig) config).needToMigrate(file, localPath); if (reason.isEmpty()) { File localFile = new File(file.toString()); - + MigrateLocalTask migrateLocalTask = new MigrateLocalTask(semaphore, ((CopyFromLocalConfig) config), smallFileTransferManager, bigFileTransferManager, recordDb, localFile); @@ -195,11 +195,11 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) log.info("ready to scan folder: " + localFolder); try { - java.nio.file.Files.walkFileTree(Paths.get(localFolder), + java.nio.file.Files.walkFileTree(Paths.get(localFolder), EnumSet.of(FOLLOW_LINKS), Integer.MAX_VALUE, finder); - + TaskStatics.instance.setListFinished(true); - + } catch (IOException e) { TaskStatics.instance.setListFinished(false); log.error("walk file tree error", e); @@ -210,4 +210,4 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) public void waitTaskOver() { super.waitTaskOver(); } -} +} \ No newline at end of file diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java b/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java index c3a88ec..697027c 100644 --- a/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java +++ b/src/main/java/com/qcloud/cos_migrate_tool/task/Task.java @@ -37,7 +37,7 @@ public abstract class Task implements Runnable { protected long smallFileThreshold; private RecordDb recordDb; protected CommonConfig config; - QUERY_RESULT query_result; + protected QUERY_RESULT query_result; diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task_by_hadoop_fs/MigrateLocalToCosnFsTask.java b/src/main/java/com/qcloud/cos_migrate_tool/task_by_hadoop_fs/MigrateLocalToCosnFsTask.java new file mode 100644 index 0000000..7d81525 --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task_by_hadoop_fs/MigrateLocalToCosnFsTask.java @@ -0,0 +1,152 @@ +package com.qcloud.cos_migrate_tool.task_by_hadoop_fs; + +import com.qcloud.cos.model.StorageClass; +import com.qcloud.cos.transfer.TransferManager; +import com.qcloud.cos_migrate_tool.config.CopyFromLocalToCosnConfig; +import com.qcloud.cos_migrate_tool.meta.TaskStatics; +import com.qcloud.cos_migrate_tool.record.MigrateLocalRecordElement; +import com.qcloud.cos_migrate_tool.record.RecordDb; +import com.qcloud.cos_migrate_tool.task.Task; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; + +import java.io.*; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.Semaphore; + +public class MigrateLocalToCosnFsTask extends Task { + private static final Logger log = LoggerFactory.getLogger(MigrateLocalToCosnFsTask.class); + + + protected FileSystem fs; + private String bucketName; + private String localFolder; + private String cosFolder; + private File localFile; + private StorageClass storageClass; + private boolean entireMd5Attached; + private boolean isCheckLocalRecord; + + public MigrateLocalToCosnFsTask(Semaphore semaphore, CopyFromLocalToCosnConfig copyFromLocalConfig, + TransferManager smallFileTransfer, TransferManager bigFileTransfer, RecordDb recordDb, + File localFile, FileSystem fs) { + super(semaphore, copyFromLocalConfig, smallFileTransfer, bigFileTransfer, recordDb); + this.bucketName = copyFromLocalConfig.getBucketName(); + this.localFolder = copyFromLocalConfig.getLocalPath(); + this.cosFolder = copyFromLocalConfig.getCosPath(); + this.localFile = localFile; + this.storageClass = copyFromLocalConfig.getStorageClass(); + this.entireMd5Attached = copyFromLocalConfig.isEntireFileMd5Attached(); + isCheckLocalRecord = copyFromLocalConfig.checkLocalRecord(); + this.fs = fs; + } + + @Override + public void doTask() { + String localPath = SystemUtils.formatLocalPath(localFile.getPath()); + String cosPath = buildCOSPath(localPath); + long mtime = localFile.lastModified(); + long fileSize = localFile.length(); + long ignoreModifiedTimeLessThan = + ((CopyFromLocalToCosnConfig) config).getIgnoreModifiedTimeLessThan(); + if (ignoreModifiedTimeLessThan > 0) { + long currTime = System.currentTimeMillis(); + if ((currTime - mtime) / 1000 < ignoreModifiedTimeLessThan) { + String printMsg = String.format( + "[condition_not_match] [reason: ignoreModifiedTimeLessThan] [local_file: %s], [cur_time: %d], [lastModifed_time: %d],[ignoreModifiedTimeLessThan: %d]", + localFile.getAbsoluteFile(), currTime / 1000, mtime / 1000, ignoreModifiedTimeLessThan); + System.out.println(printMsg); + log.info(printMsg); + TaskStatics.instance.addConditionNotMatchCnt(); + return; + } + } + + MigrateLocalRecordElement migrateLocalRecordElement = + new MigrateLocalRecordElement(bucketName, localPath, cosPath, mtime, fileSize); + + if (isCheckLocalRecord) { + // 如果记录存在 + if (isExist(migrateLocalRecordElement, true)) { + TaskStatics.instance.addSkipCnt(); + return; + } + } + + if (config.skipSamePath()) { + try { + if (isExistOnCOS(smallFileTransfer, migrateLocalRecordElement, config.getBucketName(), cosPath)) { + TaskStatics.instance.addSkipCnt(); + return; + } + } catch (Exception e) { + String printMsg = String.format("[fail] task_info: %s", migrateLocalRecordElement.buildKey()); + System.err.println(printMsg); + log.error("[fail] task_info: {}, exception: {}", migrateLocalRecordElement.buildKey(), e.toString()); + TaskStatics.instance.addFailCnt(); + return; + } + } + + try { + uploadFile(cosPath, localFile); + saveRecord(migrateLocalRecordElement); + if (this.query_result == RecordDb.QUERY_RESULT.KEY_NOT_EXIST) { + TaskStatics.instance.addSuccessCnt(); + } else { + TaskStatics.instance.addUpdateCnt(); + } + + if(!config.getOutputFinishedFilePath().isEmpty()) { + SimpleDateFormat dateFormat= new SimpleDateFormat("YYYY-MM-dd");//设置当前时间的格式,为年-月-日 + String file_name = dateFormat.format(new Date()) + ".out"; + String resultFile = config.getOutputFinishedFilePath() + file_name; + try { + BufferedOutputStream bos = + new BufferedOutputStream(new FileOutputStream(resultFile, true)); + String recordMsg = + String.format("%s\t%d\t%d\n", localFile.getAbsolutePath(), localFile.length(), localFile.lastModified()); + bos.write(recordMsg.getBytes()); + bos.close(); + } catch (FileNotFoundException e) { + log.error("write result fail,result \n" + e.toString()); + } catch (IOException e) { + log.error("write result fail,result \n" + e.toString()); + } + } + + + String printMsg = + String.format("[ok] , task_info: %s", migrateLocalRecordElement.buildKey()); + System.out.println(printMsg); + log.info(printMsg); + } catch (Exception e) { + String printMsg = + String.format("[fail] task_info: %s", migrateLocalRecordElement.buildKey()); + System.out.println(printMsg); + log.error("fail! task_info: [key: {}], [value: {}], exception: {}", + migrateLocalRecordElement.buildKey(), migrateLocalRecordElement.buildValue(), + e.toString()); + TaskStatics.instance.addFailCnt(); + } + } + + private String buildCOSPath(String localPath) { + String cosPath = cosFolder + localPath.substring(localFolder.length()); + return cosPath; + } + + public void uploadFile(String cosPath, File localFile) throws Exception { + Path dstPath = new Path(cosPath); + if(localFile.isDirectory()) { + fs.mkdirs(dstPath); + } else { + Path localPath = new Path(localFile.getPath()); + fs.copyFromLocalFile(localPath, dstPath); + } + } +} diff --git a/src/main/java/com/qcloud/cos_migrate_tool/task_by_hadoop_fs/MigrateLocalToCosnTaskExecutor.java b/src/main/java/com/qcloud/cos_migrate_tool/task_by_hadoop_fs/MigrateLocalToCosnTaskExecutor.java new file mode 100644 index 0000000..3cf270a --- /dev/null +++ b/src/main/java/com/qcloud/cos_migrate_tool/task_by_hadoop_fs/MigrateLocalToCosnTaskExecutor.java @@ -0,0 +1,221 @@ +package com.qcloud.cos_migrate_tool.task_by_hadoop_fs; + +import com.qcloud.cos_migrate_tool.config.CopyFromLocalToCosnConfig; +import com.qcloud.cos_migrate_tool.config.MigrateType; +import com.qcloud.cos_migrate_tool.meta.TaskStatics; +import com.qcloud.cos_migrate_tool.task.MigrateLocalTaskExecutor; +import com.qcloud.cos_migrate_tool.task.TaskExecutor; +import com.qcloud.cos_migrate_tool.utils.SystemUtils; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.URI; +import java.nio.file.FileVisitResult; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.EnumSet; + +import static java.nio.file.FileVisitOption.FOLLOW_LINKS; + +public class MigrateLocalToCosnTaskExecutor extends TaskExecutor { + private static final Logger log = LoggerFactory.getLogger(MigrateLocalTaskExecutor.class); + private FileSystem fs; + private String bucketName; + private String localFolder; + private String cosFolder; + private CopyFromLocalToCosnConfig config; + + public MigrateLocalToCosnTaskExecutor(CopyFromLocalToCosnConfig config) { + super(MigrateType.MIGRATE_FROM_LOCAL_TO_COSN_FS, config); + this.bucketName = config.getBucketName(); + this.localFolder = config.getLocalPath(); + this.cosFolder = config.getCosPath(); + this.config = config; + this.fs = initCosnFs(); + } + + @Override + public String buildTaskDbComment() { + String comment = String.format( + "[upload by cosn fs],[time: %s], [bucketName: %s], [localFolder: %s], [cosFolder: %s], [smallfile_exector_number: %d], [bigfile_executor_number: %d]\n", + SystemUtils.getCurrentDateTime(), bucketName, localFolder, cosFolder, + this.smallFileUploadExecutorNum, this.bigFileUploadExecutorNum); + return comment; + } + + @Override + public String buildTaskDbFolderPath() { + String temp = String.format("[local: %s], [cosFolder: %s]", localFolder, cosFolder); + String dbFolderPath = + String.format("db/migrate_from_local/%s/%s", bucketName, DigestUtils.md5Hex(temp)); + return dbFolderPath; + } + + private void buildFileListTask() { + String localPathPrefix = ""; + try (BufferedReader br = new BufferedReader(new FileReader(new File(config.getFileListPath())))) { + String line; + while ((line = br.readLine()) != null) { + Path file = Paths.get(localPathPrefix, line); + String localPath = ""; + try { + localPath = SystemUtils.formatLocalPath(file.toString()); + } catch (IllegalArgumentException e) { + String printMsg = String.format("skip the file for illegal utf-8 letter, [local_file:%s]", + file.toString()); + System.out.println(printMsg); + log.error(printMsg); + TaskStatics.instance.addConditionNotMatchCnt(); + continue; + } + try { + String reason = config.needToMigrate(file, localPath); + if (reason.isEmpty()) { + File localFile = new File(file.toString()); + + MigrateLocalToCosnFsTask migrateLocalToCosnTask = new MigrateLocalToCosnFsTask(semaphore, + (CopyFromLocalToCosnConfig) config, smallFileTransferManager, bigFileTransferManager, recordDb, localFile,fs); + AddTask(migrateLocalToCosnTask); + } else { + String printMsg = String.format( + "[condition_not_match] [reason: %s] [local_file: %s]", reason, + file.toString()); + System.out.println(printMsg); + log.info(printMsg); + TaskStatics.instance.addConditionNotMatchCnt(); + } + } catch (InterruptedException e) { + log.error("add task to queue occur a exception", e); + throw new IOException(e.getMessage()); + } + } + TaskStatics.instance.setListFinished(true); + } catch (FileNotFoundException e) { + log.error("fileList path not exist:", e); + TaskStatics.instance.setListFinished(false); + } catch (IOException e) { + log.error("error ocured:", e); + TaskStatics.instance.setListFinished(false); + } + } + + public void buildTask() { + log.info(config.toString()); + if(config.isFileListMode()) { + buildFileListTask(); + return; + } + + SimpleFileVisitor finder = new SimpleFileVisitor() { + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) + throws IOException { + String dirPath = ""; + try { + dirPath = SystemUtils.formatLocalPath(dir.toString()); + } catch (IllegalArgumentException e) { + log.error("skip the folder and it's sub member for illegal utf-8 letter"); + return FileVisitResult.SKIP_SUBTREE; + } + if (((CopyFromLocalToCosnConfig) config).isExcludes(dirPath)) { + log.info("exclude folder: " + dirPath); + return FileVisitResult.SKIP_SUBTREE; + } else { + return super.preVisitDirectory(dir, attrs); + } + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + String localPath = ""; + try { + localPath = SystemUtils.formatLocalPath(file.toString()); + } catch (IllegalArgumentException e) { + log.error("skip the file for illegal utf-8 letter"); + return super.visitFile(file, attrs); + } + try { + String reason = ((CopyFromLocalToCosnConfig) config).needToMigrate(file, localPath); + if (reason.isEmpty()) { + File localFile = new File(file.toString()); + + MigrateLocalToCosnFsTask migrateLocalToCosnTask = new MigrateLocalToCosnFsTask(semaphore, + ((CopyFromLocalToCosnConfig) config), smallFileTransferManager, + bigFileTransferManager, recordDb, localFile,fs); + AddTask(migrateLocalToCosnTask); + } else { + String printMsg = String.format( + "[condition_not_match] [reason: %s] [local_file: %s]", reason, + file.toString()); + System.out.println(printMsg); + log.info(printMsg); + TaskStatics.instance.addConditionNotMatchCnt(); + } + } catch (InterruptedException e) { + log.error("visit file occur a exception", e); + throw new IOException(e.getMessage()); + } + return super.visitFile(file, attrs); + } + }; + + log.info("ready to scan folder: " + localFolder); + try { + java.nio.file.Files.walkFileTree(Paths.get(localFolder), + EnumSet.of(FOLLOW_LINKS), Integer.MAX_VALUE, finder); + + TaskStatics.instance.setListFinished(true); + + } catch (IOException e) { + TaskStatics.instance.setListFinished(false); + log.error("walk file tree error", e); + } + } + + public FileSystem initCosnFs() { + Configuration conf = new Configuration(); + + String[] parts = config.getBucketName().split("-"); + String appid = parts[parts.length - 1]; + + conf.set("fs.cosn.impl", "org.apache.hadoop.fs.CosFileSystem"); + conf.set("fs.AbstractFileSystem.cosn.impl", "org.apache.hadoop.fs.CosN"); + conf.set("fs.cosn.userinfo.secretId", config.getAk()); + conf.set("fs.cosn.userinfo.secretKey", config.getSk()); + conf.set("fs.cosn.bucket.region", config.getRegion()); + conf.set("fs.cosn.tmp.dir", config.getTempFolderPath()); + conf.set("fs.cosn.trsf.fs.ofs.tmp.cache.dir", config.getTempFolderPath()); + conf.set("fs.cosn.userinfo.appid", appid); + String cosnUrl = "cosn://" + config.getBucketName()+"/"; + + + try { + fs = FileSystem.get(URI.create(cosnUrl), conf); + } catch (IOException e) { + log.error("create cosn fs failed!", e); + System.out.println("create cosn fs failed: "+e.getMessage()); + return null; + } + return fs; + } + + @Override + public void waitTaskOver() { + super.waitTaskOver(); + try { + this.fs.close(); + } catch (IOException e) { + log.error("close fs occurred ioexception!", e); + System.err.println("close fs occurred ioexception!"); + } + } +} +