diff --git a/demo/cos_demo.cpp b/demo/cos_demo.cpp index 998e5c4..9dafb17 100644 --- a/demo/cos_demo.cpp +++ b/demo/cos_demo.cpp @@ -6,6 +6,7 @@ // Description: #include +#include #include #include #include @@ -15,6 +16,8 @@ #include "cos_sys_config.h" #include "cos_defines.h" +#include "Poco/SharedPtr.h" + using namespace qcloud_cos; void PrintResult(const qcloud_cos::CosResult& result, const qcloud_cos::BaseResp& resp) { if (result.IsSucc()) { @@ -431,6 +434,7 @@ void CompleteMultiUpload(qcloud_cos::CosAPI& cos, const std::string& bucket_name std::cout << "========================================================" << std::endl; } +// Upload object without handler void MultiUploadObject(qcloud_cos::CosAPI& cos, const std::string& bucket_name, const std::string& object_name, const std::string& local_file) { qcloud_cos::MultiUploadObjectReq req(bucket_name, @@ -462,6 +466,52 @@ void MultiUploadObject(qcloud_cos::CosAPI& cos, const std::string& bucket_name, std::cout << "========================================================" << std::endl; } +void uploadprogress(const MultiUploadObjectReq *req, Poco::SharedPtr &handler) { + std::cout << "callback data is :" << handler->GetProgress() << std::endl; +} + +void statusprogress(const MultiUploadObjectReq *req, Poco::SharedPtr &handler) { + std::cout << "callback status is :" << handler->GetStatusString() << std::endl; +} + +// Upload object with handler +void TransferUploadObject(qcloud_cos::CosAPI& cos, const std::string& bucket_name, + const std::string& object_name, const std::string& local_file) { + qcloud_cos::MultiUploadObjectReq req(bucket_name, + object_name, local_file); + req.SetRecvTimeoutInms(1000 * 60); + req.SetUploadProgressCallback(uploadprogress); + req.SetTransferStatusUpdateCallback(statusprogress); + qcloud_cos::MultiUploadObjectResp resp; + Poco::SharedPtr handler = cos.TransferUploadObject(req, &resp); + // The TransferUploadObject is the asynchronization api, can use WaitUntilFinish to block until finish. + // At the same time the handler support the GetTotalSize(), GetProgress(), GetStatus(), Cancel() etc. + handler->WaitUntilFinish(); + + // Notice when not block with the WaitUntilFinish() the result might not get soon. + if (handler->m_result.IsSucc()) { + std::cout << "MultiUpload Succ." << std::endl; + std::cout << resp.GetLocation() << std::endl; + std::cout << resp.GetKey() << std::endl; + std::cout << resp.GetBucket() << std::endl; + std::cout << resp.GetEtag() << std::endl; + } else { + std::cout << "MultiUpload Fail." << std::endl; + // 获取具体失败在哪一步 + std::string resp_tag = resp.GetRespTag(); + if ("Init" == resp_tag) { + // print result + } else if ("Upload" == resp_tag) { + // print result + } else if ("Complete" == resp_tag) { + // print result + } + } + std::cout << "===================MultiUpload=============================" << std::endl; + PrintResult(handler->m_result, resp); + std::cout << "========================================================" << std::endl; +} + void ListParts(qcloud_cos::CosAPI& cos, const std::string& bucket_name, const std::string& object_name, const std::string& upload_id) { qcloud_cos::ListPartsReq req(bucket_name, object_name, upload_id); @@ -654,6 +704,8 @@ int main(int argc, char** argv) { // GetObjectByFile(cos, bucket_name, "sevenyou_e2_abc", "/data/sevenyou/temp/sevenyou_10m_download_03"); //GetObjectByStream(cos, bucket_name, "sevenyou_e2_abc"); // MultiGetObject(cos, bucket_name, "sevenyou_1102_south_multi", "/data/sevenyou/temp/sevenyou_10m_download_03"); + // MultiGetObject(cos, bucket_name, "test000part", "./multiget"); + // TransferUploadObject(cos, bucket_name, "transfer", "./test6M1"); // { // std::string upload_id; @@ -797,5 +849,9 @@ int main(int argc, char** argv) { // PrintResult(result, resp); // std::cout << "=========================================================" << std::endl; // } - +#if defined(_WIN32) + system("pause"); +#endif } + + diff --git a/include/cos_api.h b/include/cos_api.h index cabab9c..25ba783 100644 --- a/include/cos_api.h +++ b/include/cos_api.h @@ -5,8 +5,10 @@ #include "op/cos_result.h" #include "op/object_op.h" #include "op/service_op.h" -#include "util/simple_mutex.h" + +#include "boost/thread/mutex.hpp" #include "Poco/SharedPtr.h" +#include "trsf/transfer_handler.h" namespace qcloud_cos { @@ -65,6 +67,7 @@ class CosAPI { /// \return 本次请求的调用情况(如状态码等) CosResult PutBucket(const PutBucketReq& request, PutBucketResp* response); + /// \brief 确认Bucket是否存在 /// (详见:https://cloud.tencent.com/document/product/436/7735) /// @@ -331,7 +334,14 @@ class CosAPI { /// /// \return 返回HTTP请求的状态码及错误信息 CosResult MultiUploadObject(const MultiUploadObjectReq& request, - MultiUploadObjectResp* response); + MultiUploadObjectResp* response) ; + + + Poco::SharedPtr TransferUploadObject(const MultiUploadObjectReq& request, + MultiUploadObjectResp* response) ; + + Poco::SharedPtr CreateUploadHandler(const std::string& bucket_name, const std::string& object_name, + const std::string& local_path) ; /// \brief 舍弃一个分块上传并删除已上传的块 /// 详见: https://www.qcloud.com/document/product/436/7740 @@ -416,11 +426,46 @@ class CosAPI { BucketOp m_bucket_op; // 内部封装bucket相关的操作 ServiceOp m_service_op; // 内部封装service相关的操作 - static SimpleMutex s_init_mutex; + mutable boost::mutex s_init_mutex; static bool s_init; static bool s_poco_init; static int s_cos_obj_num; }; +// Use for trsf the param into boost bind function +class AsynArgs { +public: + AsynArgs(ObjectOp* op) : m_op(op) {} + AsynArgs(const AsynArgs& arg) { + this->m_op = arg.m_op; + } + virtual ~AsynArgs() {}; + ObjectOp* m_op; +}; + + +class TransferAsynArgs : public AsynArgs { +public: +TransferAsynArgs(ObjectOp* pObj, + const MultiUploadObjectReq& req, + MultiUploadObjectResp *resp, + Poco::SharedPtr& handler) : AsynArgs(pObj) , m_req(req), m_resp(resp) { + m_handler = handler; + + } + +TransferAsynArgs(const TransferAsynArgs& arg) + : AsynArgs(arg), + m_req(arg.m_req), + m_handler(arg.m_handler), + m_resp(arg.m_resp) { + } + virtual ~TransferAsynArgs() {} +public: + MultiUploadObjectReq m_req; + Poco::SharedPtr m_handler; + MultiUploadObjectResp* m_resp; +}; + } // namespace qcloud_cos #endif diff --git a/include/cos_config.h b/include/cos_config.h index 8e1ea20..82a5836 100644 --- a/include/cos_config.h +++ b/include/cos_config.h @@ -1,10 +1,11 @@ #ifndef COS_CONFIG_H #define COS_CONFIG_H +#include #include -#include -#include "util/simple_mutex.h" +#include "boost/thread.hpp" +#include "boost/thread/shared_mutex.hpp" namespace qcloud_cos{ class CosConfig{ @@ -116,7 +117,7 @@ class CosConfig{ void SetConfigCredentail(const std::string& access_key, const std::string& secret_key, const std::string& tmp_token); private: - mutable SimpleRWLock m_lock; + mutable boost::shared_mutex m_lock; uint64_t m_app_id; std::string m_access_key; std::string m_secret_key; diff --git a/include/cos_defines.h b/include/cos_defines.h index 1f907ba..b8051e8 100644 --- a/include/cos_defines.h +++ b/include/cos_defines.h @@ -2,7 +2,10 @@ #define COS_DEFINE_H #include #include + +#if !defined(_WIN32) #include +#endif #include #include @@ -27,6 +30,8 @@ const int kMaxThreadPoolSizeUploadPart = 100; /// 分块上传的线程池最小数目 const int kMinThreadPoolSizeUploadPart = 1; +const int kMaxPartNumbers = 10000; + /// 分块大小1M const uint64_t kPartSize1M = 1 * 1024 * 1024; /// 分块大小5G @@ -60,23 +65,38 @@ typedef enum cos_log_level { (level == COS_LOG_WARN) ? "[WARN] " : \ (level == COS_LOG_ERR) ? "[ERR] " : "[CRIT]") + +#if defined(_WIN32) #define COS_LOW_LOGPRN(level, fmt, ...) \ if (level <= CosSysConfig::GetLogLevel()) { \ if (CosSysConfig::GetLogOutType()== COS_LOG_STDOUT) { \ - fprintf(stdout,"%s:%s(%d) " fmt "%s\n", LOG_LEVEL_STRING(level),__func__,__LINE__, __VA_ARGS__); \ + fprintf(stdout,"%s:%s(%d) " fmt "\n", LOG_LEVEL_STRING(level),__func__,__LINE__, ##__VA_ARGS__); \ }else if (CosSysConfig::GetLogOutType() == COS_LOG_SYSLOG){ \ - syslog(LOG_INFO,"%s:%s(%d) " fmt "%s\n", LOG_LEVEL_STRING(level),__func__,__LINE__, __VA_ARGS__); \ } else { \ } \ } else { \ - } \ + } +#else +#define COS_LOW_LOGPRN(level, fmt, ...) \ + if (level <= CosSysConfig::GetLogLevel()) { \ + if (CosSysConfig::GetLogOutType()== COS_LOG_STDOUT) { \ + fprintf(stdout,"%s:%s(%d) " fmt "\n", LOG_LEVEL_STRING(level),__func__,__LINE__, ##__VA_ARGS__); \ + }else if (CosSysConfig::GetLogOutType() == COS_LOG_SYSLOG){ \ + syslog(LOG_INFO,"%s:%s(%d) " fmt "\n", LOG_LEVEL_STRING(level),__func__,__LINE__, ##__VA_ARGS__); \ + } else { \ + } \ + } else { \ + } +#endif + -#define SDK_LOG_DBG(fmt, ...) COS_LOW_LOGPRN(COS_LOG_DBG, fmt, ##__VA_ARGS__, "") -#define SDK_LOG_INFO(fmt, ...) COS_LOW_LOGPRN(COS_LOG_INFO, fmt, ##__VA_ARGS__, "") -#define SDK_LOG_WARN(fmt, ...) COS_LOW_LOGPRN(COS_LOG_WARN, fmt, ##__VA_ARGS__, "") -#define SDK_LOG_ERR(fmt, ...) COS_LOW_LOGPRN(COS_LOG_ERR, fmt, ##__VA_ARGS__, "") -#define SDK_LOG_COS(level, fmt, ...) COS_LOW_LOGPRN(level, fmt, ##__VA_ARGS__, "") +// For now just support the std output log for windows +#define SDK_LOG_DBG(fmt, ...) COS_LOW_LOGPRN(COS_LOG_DBG, fmt, ##__VA_ARGS__) +#define SDK_LOG_INFO(fmt, ...) COS_LOW_LOGPRN(COS_LOG_INFO, fmt, ##__VA_ARGS__) +#define SDK_LOG_WARN(fmt, ...) COS_LOW_LOGPRN(COS_LOG_WARN, fmt, ##__VA_ARGS__) +#define SDK_LOG_ERR(fmt, ...) COS_LOW_LOGPRN(COS_LOG_ERR, fmt, ##__VA_ARGS__) +#define SDK_LOG_COS(level, fmt, ...) COS_LOW_LOGPRN(level, fmt, ##__VA_ARGS__) #define MIN(a, b) ((a) < (b) ? (a) : (b)) #define MAX(a, b) ((a) > (b) ? (a) : (b)) diff --git a/include/cos_sys_config.h b/include/cos_sys_config.h index 4c12a60..db8cf3c 100644 --- a/include/cos_sys_config.h +++ b/include/cos_sys_config.h @@ -1,6 +1,5 @@ #ifndef COS_SYS_CONF_H #define COS_SYS_CONF_H -#include #include #include "cos_defines.h" @@ -139,6 +138,7 @@ class CosSysConfig { static bool m_is_check_md5; static std::string m_dest_domain; + }; } // namespace qcloud_cos diff --git a/include/op/file_copy_task.h b/include/op/file_copy_task.h index 2286ec4..34d9682 100644 --- a/include/op/file_copy_task.h +++ b/include/op/file_copy_task.h @@ -2,8 +2,6 @@ #define FILE_COPY_TASK_H #pragma once -#include - #include #include "cos_config.h" diff --git a/include/op/file_download_task.h b/include/op/file_download_task.h index 9516f83..05bf5bc 100644 --- a/include/op/file_download_task.h +++ b/include/op/file_download_task.h @@ -9,8 +9,6 @@ #define FILE_DOWN_TASK_H #pragma once -#include - #include #include "cos_config.h" @@ -42,7 +40,7 @@ class FileDownTask { void DownTask(); - void SetDownParams(unsigned char* pdatabuf, size_t datalen, uint64_t offset); + void SetDownParams(unsigned char* pdatabuf, size_t datalen, uint64_t offset, uint64_t target_size); std::string GetTaskResp(); @@ -71,6 +69,7 @@ class FileDownTask { int m_http_status; std::map m_resp_headers; std::string m_err_msg; + uint64_t m_target_size; }; } // namespace qcloud_cos diff --git a/include/op/file_upload_task.h b/include/op/file_upload_task.h index 1775599..4783f10 100644 --- a/include/op/file_upload_task.h +++ b/include/op/file_upload_task.h @@ -2,8 +2,6 @@ #define FILE_UPLOAD_TASK_H #pragma once -#include - #include #include "cos_config.h" @@ -11,11 +9,15 @@ #include "cos_params.h" #include "cos_sys_config.h" #include "op/base_op.h" +#include "request/object_req.h" #include "util/codec_util.h" #include "util/file_util.h" #include "util/http_sender.h" #include "util/string_util.h" +#include "Poco/SharedPtr.h" +#include "trsf/transfer_handler.h" + namespace qcloud_cos{ class FileUploadTask { @@ -46,6 +48,8 @@ class FileUploadTask { bool IsTaskSuccess() const; + void SetTaskSuccess() { m_is_task_success = true; } + int GetHttpStatus() const ; std::map GetRespHeaders() const; @@ -56,6 +60,22 @@ class FileUploadTask { std::string GetErrMsg() const { return m_err_msg; } + void SetResume(const bool is_resume) { m_is_resume = is_resume; } + + bool IsResume() const { return m_is_resume; } + + void SetHandler(const bool is_handler) { m_is_handler = is_handler; } + + bool IsHandler() const { return m_is_handler; } + + void SetResumeEtag(const std::string& etag) { m_resume_etag = etag; } + + std::string GetResumeEtag() const { return m_resume_etag; } + +public: + Poco::SharedPtr m_handler; + MultiUploadObjectReq *m_req; + private: std::string m_full_url; std::map m_headers; @@ -69,6 +89,9 @@ class FileUploadTask { int m_http_status; std::map m_resp_headers; std::string m_err_msg; + bool m_is_resume; + std::string m_resume_etag; + bool m_is_handler; }; } diff --git a/include/op/object_op.h b/include/op/object_op.h index 13d6e07..5d4c262 100644 --- a/include/op/object_op.h +++ b/include/op/object_op.h @@ -15,6 +15,9 @@ #include "request/object_req.h" #include "response/object_resp.h" +#include "Poco/SharedPtr.h" +#include "trsf/transfer_handler.h" + namespace qcloud_cos { class FileUploadTask; @@ -35,6 +38,14 @@ class ObjectOp : public BaseOp { /// \brief 判断object是否存在 bool IsObjectExist(const std::string& bucket_name, const std::string& object_name); + std::string GetResumableUploadID(const std::string& bucket_name, const std::string& object_name) ; + + bool CheckUploadPart(const MultiUploadObjectReq& req, const std::string& bucket_name, + const std::string& object_name, const std::string& uploadid, + const std::string& localpath, std::vector& already_exist); + + bool check_single_part(const std::string& local_file_path, uint64_t offset, uint64_t local_part_size, + uint64_t size, const std::string& etag); /// \brief 获取对应Object的meta信息数据 /// /// \param request HeadObject请求 @@ -140,8 +151,13 @@ class ObjectOp : public BaseOp { /// \param response MultiUploadObject返回 /// /// \return 返回HTTP请求的状态码及错误信息 + CosResult MultiUploadObject(const MultiUploadObjectReq& req, Poco::SharedPtr& handler, + MultiUploadObjectResp* resp); + CosResult MultiUploadObject(const MultiUploadObjectReq& req, MultiUploadObjectResp* resp); + Poco::SharedPtr CreateUploadHandler(const std::string& bucket_name, const std::string& object_name, + const std::string& local_path); /// \brief 舍弃一个分块上传并删除已上传的块 /// /// \param req AbortMultiUpload请求 @@ -216,6 +232,16 @@ class ObjectOp : public BaseOp { // 上传文件, 内部使用多线程 CosResult MultiThreadUpload(const MultiUploadObjectReq& req, const std::string& upload_id, + const std::vector& already_exist_parts, + Poco::SharedPtr& handler, + bool resume_flag, + std::vector* etags_ptr, + std::vector* part_numbers_ptr); + + CosResult MultiThreadUpload(const MultiUploadObjectReq& req, + const std::string& upload_id, + const std::vector& already_exist_parts, + bool resume_flag, std::vector* etags_ptr, std::vector* part_numbers_ptr); diff --git a/include/request/object_req.h b/include/request/object_req.h index 60d26d4..fd5132f 100644 --- a/include/request/object_req.h +++ b/include/request/object_req.h @@ -17,9 +17,15 @@ #include "cos_defines.h" #include "cos_sys_config.h" +#include "trsf/transfer_handler.h" + +#include "Poco/SharedPtr.h" + namespace qcloud_cos { +class TransferHandler; + class ObjectReq : public BaseReq { public: ObjectReq(const std::string& bucket_name, const std::string& object_name) @@ -392,7 +398,7 @@ class DeleteObjectsReq : public BaseReq { return m_objvers; } - uint32_t GetObjectVerionsSize() const { + size_t GetObjectVerionsSize() const { return m_objvers.size(); } @@ -659,6 +665,10 @@ class CompleteMultiUploadReq : public ObjectReq { }; class MultiUploadObjectReq : public ObjectReq { +public: + typedef void (*UploadProgressCallback)(const MultiUploadObjectReq *req, Poco::SharedPtr& handler); + typedef void (*TransferStatusUpdateCallback)(const MultiUploadObjectReq *req, Poco::SharedPtr& handler); + public: MultiUploadObjectReq(const std::string& bucket_name, const std::string& object_name, const std::string& local_file_path = "") @@ -667,6 +677,8 @@ class MultiUploadObjectReq : public ObjectReq { m_part_size = CosSysConfig::GetUploadPartSize(); m_thread_pool_size = CosSysConfig::GetUploadThreadPoolSize(); mb_set_meta = false; + m_progress_callback = NULL; + m_status_callback = NULL; // 默认打开当前路径下object的同名文件 if (local_file_path.empty()) { @@ -723,12 +735,49 @@ class MultiUploadObjectReq : public ObjectReq { return m_xcos_meta; } + void SetUploadID(const std::string& uploadid) { + if (!uploadid.empty()) { + m_uploadid = uploadid; + } + } + + std::string GetUploadID() const { + return m_uploadid; + } + + void SetUploadProgressCallback(UploadProgressCallback callback) { + m_progress_callback = callback; + } + + void SetTransferStatusUpdateCallback(TransferStatusUpdateCallback callback) { + m_status_callback = callback; + } + + void TriggerUploadProgressCallback(Poco::SharedPtr& handler) const { + if(m_progress_callback) { + m_progress_callback(this, handler); + } + } + + void TriggerTransferStatusUpdateCallback(Poco::SharedPtr& handler) const { + if(m_status_callback) { + m_status_callback(this, handler); + } + } + private: std::string m_local_file_path; uint64_t m_part_size; int m_thread_pool_size; std::map m_xcos_meta; bool mb_set_meta; + std::string m_uploadid; + +public: + // These callback only used in the sync function TransferUploadObject + UploadProgressCallback m_progress_callback; + TransferStatusUpdateCallback m_status_callback; + }; class AbortMultiUploadReq : public ObjectReq { diff --git a/include/trsf/transfer_handler.h b/include/trsf/transfer_handler.h new file mode 100644 index 0000000..9d2fbc8 --- /dev/null +++ b/include/trsf/transfer_handler.h @@ -0,0 +1,156 @@ +#ifndef __TRSF_HANDLER_H__ +#define __TRSF_HANDLER_H__ + +#include +#include +#include + +#include "boost/thread/mutex.hpp" +#include "boost/thread/condition_variable.hpp" +#include "Poco/SharedPtr.h" + +#include "op/cos_result.h" +#include "request/object_req.h" + + +namespace qcloud_cos{ + class MultiUploadObjectReq; + + class PartState { + public: + PartState(); + PartState(int part_num, std::string& etag, size_t size, bool last_part = false); + + void SetPartNum(int number) { m_part_num = number; } + int GetPartNum() const { return m_part_num; } + + void SetEtag(const std::string& etag) { m_etag = etag; } + std::string GetEtag() const { return m_etag; } + + void SetSize(size_t size) { m_size_inbytes = size; } + size_t GetSize() const { return m_size_inbytes; } + + void SetLastPart(bool lastpart) { m_lastpart = lastpart; } + bool IsLastPart() { return m_lastpart; } + + private: + int m_part_num; + // current use the md5 + std::string m_etag; + + size_t m_size_inbytes; + + // TODO for now just care about the whole progress + /* size_t m_current_progress_inbytes; */ + /* size_t m_range_begin; */ + + bool m_lastpart; + }; + + typedef Poco::SharedPtr PartPointer; + // Key is partnumber + typedef std::map PartStateMap; + + enum class TransferStatus + { + NOT_START, + //Operation is now running + IN_PROGRESS, + //Operation was canceled. + CANCELED, + //Operation failed + FAILED, + //Operation was successful + COMPLETED, + //Operation either failed or was canceled and a user deleted the multi-part upload . + ABORTED + }; + + // For now support the multiupload + class TransferHandler { + public: + // Upload + TransferHandler(const std::string& bucket_name, const std::string& object_name, + uint64_t total_size, const std::string& file_path=""); + + void SetBucketName(const std::string& bucket_name) { m_bucket_name = bucket_name; } + std::string GetBucketName() const { return m_bucket_name; } + + void SetObjectName(const std::string& object_name) { m_object_name = object_name; } + std::string GetObjectName() const { return m_object_name; } + + void SetLocalFilePath(const std::string& local_file_path) { m_local_file_path = local_file_path; } + std::string GetLocalFilePath() const { return m_local_file_path; } + + void SetTotalSize(uint64_t total_size) { m_total_size = total_size; } + uint64_t GetTotalSize() const { return m_total_size; } + + // Notice there can not backwards + void UpdateProgress(uint64_t update_prog); + // Get the current upload size(B). + uint64_t GetProgress() const; + + void UpdateStatus(TransferStatus status); + // Get the current status of process, detail see the enum TransferStatus. + TransferStatus GetStatus() const; + + std::string GetStatusString() const; + + void SetUploadID(const std::string& uploadid) { m_uploadid = uploadid; } + // Get the init or resumed uploadid. + std::string GetUploadID() const { return m_uploadid; } + + // Cancel the process of interface the uploadid can reuse. + void Cancel(); + + bool ShouldContinue() const; + + bool IsFinishStatus(TransferStatus status) const; + + bool IsAllowTransition(TransferStatus org, TransferStatus dst) const; + + // Block until finish. + void WaitUntilFinish(); + + public: + // Origin result + CosResult m_result; + + private: + std::string m_bucket_name; + std::string m_object_name; + std::string m_local_file_path; + uint64_t m_total_size; + // The m_current_progress best to use the atomic. but can not support c11 for now, so use the mutex. + uint64_t m_current_progress; + TransferStatus m_status; + std::string m_uploadid; + // Is cancel + bool m_cancel; + + PartStateMap m_part_map; + + // Mutex lock for the progress + mutable boost::mutex m_lock_prog; + // Mutex lock for the status + mutable boost::mutex m_lock_stat; + // Condition + mutable boost::condition_variable m_cond; + + // Mutex lock for the part map + // mutable boost::mutex m_lock_parts; + }; + + class HandleStreamCopier { + public: + static std::streamsize handleCopyStream(const MultiUploadObjectReq *req, std::istream& istr, std::ostream& ostr, + Poco::SharedPtr& handler, std::size_t bufferSize = 8192); + + + }; + + + +} + +#endif diff --git a/include/util/auth_tool.h b/include/util/auth_tool.h index bb18208..2afc47d 100644 --- a/include/util/auth_tool.h +++ b/include/util/auth_tool.h @@ -1,79 +1,83 @@ -#ifndef UTIL_AUTHTOOl_H -#define UTIL_AUTHTOOl_H - -#include - -#include -#include -#include - -#include "request/base_req.h" -#include "util/noncopyable.h" - -namespace qcloud_cos { - -class AuthTool : private NonCopyable { -public: - /// \brief ǩָЧ(ͨCosSysConfig, Ĭ60s)ʹ - /// - /// \param secret_id ӵеĿʶ ID֤ - /// \param secret_key ӵеĿԿ - /// \param http_method http,POST/GET/HEAD/PUT, Сд - /// \param in_uri http uri - /// \param headers http headerļֵ - /// \param params http paramsļֵ - /// - /// \return ַʽǩؿմʧ - static std::string Sign(const std::string& secret_id, - const std::string& secret_key, - const std::string& http_method, - const std::string& in_uri, - const std::map& headers, - const std::map& params); - - /// \brief ǩָЧʹ - /// - /// \param secret_id ӵеĿʶ ID֤ - /// \param secret_key ӵеĿԿ - /// \param http_method http,POST/GET/HEAD/PUT, Сд - /// \param in_uri http uri - /// \param headers http headerļֵ - /// \param params http paramsļֵ - /// - /// \return ַʽǩؿմʧ - static std::string Sign(const std::string& secret_id, - const std::string& secret_key, - const std::string& http_method, - const std::string& in_uri, - const std::map& headers, - const std::map& params, - uint64_t start_time_in_s, - uint64_t end_time_in_s); - -private: - /// \brief paramsеݣתСд,keyparam_list key=valueparam_value_list - /// \param params - /// \param key_encode keyǷuri - /// \param value_encode valueǷuri - /// \param value_lower valueǷСд - /// \param param_list б;ָ - /// \param param_value_list ֵб,&ָ - /// \retval - static void FillMap(const std::map ¶ms, - bool key_encode, - bool value_encode, - bool value_lower, - std::string* param_list, - std::string* param_value_list); - - /// \brief ҳҪȨͷ,Ŀǰhost conent-type xͷĶҪȨ - /// \param hedaers ͷkv - /// \param filted_req_headers ҪȨͷ - /// \retval - static void FilterAndSetSignHeader(const std::map& headers, - std::map* filted_req_headers); -}; - -} // namespace qcloud_cos - -#endif // AUTHTOOL_H +#ifndef UTIL_AUTHTOOl_H +#define UTIL_AUTHTOOl_H + +#include + +#include +#include +#include + +#include "request/base_req.h" +#include "util/noncopyable.h" + +namespace qcloud_cos { + +class AuthTool : private NonCopyable { +public: + /// \brief 返回签名,可以在指定的有效期内(通过CosSysConfig设置, 默认60s)使用 + /// + /// \param secret_id 开发者拥有的项目身份识别 ID,用以身份认证 + /// \param secret_key 开发者拥有的项目身份密钥 + /// \param http_method http方法,如POST/GET/HEAD/PUT等, 传入大小写不敏感 + /// \param in_uri http uri + /// \param headers http header的键值对 + /// \param params http params的键值对 + /// + /// \return 字符串形式的签名,返回空串代表失败 + static std::string Sign(const std::string& secret_id, + const std::string& secret_key, + const std::string& http_method, + const std::string& in_uri, + const std::map& headers, + const std::map& params); + + /// \brief 返回签名,可以在指定的有效期内使用 + /// + /// \param secret_id 开发者拥有的项目身份识别 ID,用以身份认证 + /// \param secret_key 开发者拥有的项目身份密钥 + /// \param http_method http方法,如POST/GET/HEAD/PUT等, 传入大小写不敏感 + /// \param in_uri http uri + /// \param headers http header的键值对 + /// \param params http params的键值对 + /// + /// \return 字符串形式的签名,返回空串代表失败 + static std::string Sign(const std::string& secret_id, + const std::string& secret_key, + const std::string& http_method, + const std::string& in_uri, + const std::map& headers, + const std::map& params, + uint64_t start_time_in_s, + uint64_t end_time_in_s); + +private: + /// \brief 把params中的数据,转小写,正排,key放在param_list key=value放param_value_list + /// \param params 参数 + /// \param key_encode key是否进行uri编码 + /// \param value_encode value是否进行uri编码 + /// \param value_lower value是否小写 + /// \param param_list 参数名列表,以;分隔 + /// \param param_value_list 参数键值对列表,以&分隔 + /// \retval 无 + static void FillMap(const std::map ¶ms, + bool key_encode, + bool value_encode, + bool value_lower, + std::string* param_list, + std::string* param_value_list); + + /// \brief 找出需要鉴权的头部,并设置,目前host conent-type 还有x开头的都要鉴权 + /// \param hedaers 头部的kv对 + /// \param filted_req_headers 需要鉴权的头部 + /// \retval 无 + static void FilterAndSetSignHeader(const std::map& headers, + std::map* filted_req_headers); +}; + +} // namespace qcloud_cos + +#endif // AUTHTOOL_H + + + + diff --git a/include/util/http_sender.h b/include/util/http_sender.h index 2129e0f..f012a43 100644 --- a/include/util/http_sender.h +++ b/include/util/http_sender.h @@ -17,10 +17,44 @@ #include "request/base_req.h" #include "response/base_resp.h" +#include "Poco/SharedPtr.h" +#include "trsf/transfer_handler.h" + + namespace qcloud_cos { class HttpSender { public: + // trsf handler + static int SendRequest(const MultiUploadObjectReq *req, + const std::string& http_method, + const std::string& url_str, + const std::map& req_params, + const std::map& req_headers, + const std::string& req_body, + Poco::SharedPtr& handler, + uint64_t conn_timeout_in_ms, + uint64_t recv_timeout_in_ms, + std::map* resp_headers, + std::string* resp_body, + std::string* err_msg, + bool is_check_md5 = false); + + // real trsf handler process + static int SendRequest(const MultiUploadObjectReq *req, + const std::string& http_method, + const std::string& url_str, + const std::map& req_params, + const std::map& req_headers, + std::istream& is, + uint64_t conn_timeout_in_ms, + uint64_t recv_timeout_in_ms, + std::map* resp_headers, + std::ostream& resp_stream, + std::string* err_msg, + Poco::SharedPtr& handler, + bool is_check_md5 = false); + static int SendRequest(const std::string& http_method, const std::string& url_str, const std::map& req_params, diff --git a/include/util/simple_mutex.h b/include/util/simple_mutex.h index 9a817d8..8a0aac3 100644 --- a/include/util/simple_mutex.h +++ b/include/util/simple_mutex.h @@ -1,33 +1,63 @@ #ifndef SIMPLE_MUTEX_H #define SIMPLE_MUTEX_H + +#if defined(_WIN32) +#include +#include +#else #include +#endif +/** + * Notice SimpleMutext and SimpleRWLock are not used in current project, + * now use the boost mutex and shared_mutex instead. + * when want to reuse this header need notice the windows header included order. + */ class SimpleMutex { public: SimpleMutex() { - pthread_mutex_init(&m_mutex, NULL); +#if defined(_WIN32) + m_mutex = CreateMutexA(NULL, FALSE, NULL); +#else + pthread_mutex_init(&m_mutex, NULL); +#endif } ~SimpleMutex() { - pthread_mutex_destroy(&m_mutex); +#if defined(_WIN32) + CloseHandle(m_mutex); +#else + pthread_mutex_destroy(&m_mutex); +#endif } void Lock() { - pthread_mutex_lock(&m_mutex); - } +#if defined(_WIN32) + DWORD d = WaitForSingleObject(m_mutex, INFINITE); +#else + pthread_mutex_lock(&m_mutex); +#endif } void Unlock() { - pthread_mutex_unlock(&m_mutex); +#if defined(_WIN32) + ReleaseMutex(m_mutex); +#else + pthread_mutex_unlock(&m_mutex); +#endif } private: - pthread_mutex_t m_mutex; +#if defined(_WIN32) + HANDLE m_mutex; +#else + pthread_mutex_t m_mutex; +#endif }; // mutex holder class SimpleMutexLocker { public: - SimpleMutexLocker(SimpleMutex* mutex) : m_mutex(mutex) { + SimpleMutexLocker(SimpleMutex& mutex) : m_mutex(&mutex) { m_mutex->Lock(); } @@ -40,31 +70,60 @@ class SimpleMutexLocker { }; -// scoped rwlock for cos_config +// Scoped rwlock for cos_config +// Window's rwlock some version can not support, so there now use the mutex lock, +// then think about changing into the boost::shared_lock and boost::shared_mutex. class SimpleRWLock { public: SimpleRWLock() { - pthread_rwlock_init(&m_lock,NULL); +#if defined(_WIN32) + m_lock = CreateMutexA(NULL, FALSE, NULL); +#else + pthread_rwlock_init(&m_lock, NULL); +#endif } ~SimpleRWLock() { - pthread_rwlock_destroy(&m_lock); +#if defined(_WIN32) + CloseHandle(m_lock); +#else + pthread_rwlock_destroy(&m_lock); +#endif } void WriteLock() { - pthread_rwlock_wrlock(&m_lock); +#if defined(_WIN32) + DWORD d = WaitForSingleObject(m_lock, INFINITE); +#else + pthread_rwlock_wrlock(&m_lock); +#endif + } void ReadLock() { - pthread_rwlock_rdlock(&m_lock); +#if defined(_WIN32) + DWORD d = WaitForSingleObject(m_lock, INFINITE); +#else + pthread_rwlock_rdlock(&m_lock); +#endif + } void Unlock() { - pthread_rwlock_unlock(&m_lock); +#if defined(_WIN32) + ReleaseMutex(m_lock); +#else + pthread_rwlock_unlock(&m_lock); +#endif } private: - pthread_rwlock_t m_lock; +#if defined(_WIN32) + HANDLE m_lock; +#else + pthread_rwlock_t m_lock; +#endif + }; class SimpleWLocker { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d8298cd..340df06 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,29 +1,37 @@ -cmake_minimum_required(VERSION 2.8) -CMAKE_policy(SET CMP0015 NEW) - -# CMakeLists for src directory -PROJECT(COS_CPP_SDK) - -if (OPENSSL_VERSION VERSION_LESS 1.1.0) - message("old openssl version less than 1.1.0") - set(COSSDK_SOURCE_FILES cos_api.cpp cos_config.cpp cos_sys_config.cpp - request/base_req.cpp request/bucket_req.cpp request/object_req.cpp response/base_resp.cpp - response/object_resp.cpp response/bucket_resp.cpp response/service_resp.cpp - op/file_copy_task.cpp op/file_download_task.cpp op/file_upload_task.cpp op/base_op.cpp op/object_op.cpp - op/bucket_op.cpp op/service_op.cpp op/cos_result.cpp util/auth_tool.cpp - util/codec_util.cpp util/file_util.cpp util/http_sender.cpp - util/sha1.cpp util/string_util.cpp) -ELSE() - message("new version upper than 1.1.0") - set(COSSDK_SOURCE_FILES cos_api.cpp cos_config.cpp cos_sys_config.cpp - request/base_req.cpp request/bucket_req.cpp request/object_req.cpp response/base_resp.cpp - response/object_resp.cpp response/bucket_resp.cpp response/service_resp.cpp - op/file_copy_task.cpp op/file_download_task.cpp op/file_upload_task.cpp op/base_op.cpp op/object_op.cpp - op/bucket_op.cpp op/service_op.cpp op/cos_result.cpp util/auth_tool.cpp - util/codec_util_high_openssl.cpp util/file_util.cpp util/http_sender.cpp - util/sha1.cpp util/string_util.cpp) -ENDIF() - -add_library(cossdk STATIC ${COSSDK_SOURCE_FILES}) -target_link_libraries(cossdk PocoNetSSL PocoNet PocoCrypto PocoUtil PocoJSON PocoXML PocoFoundation ssl crypto rt stdc++ pthread jsoncpp boost_thread boost_system) -set_target_properties(cossdk PROPERTIES OUTPUT_NAME "cossdk") +cmake_minimum_required(VERSION 2.8) +CMAKE_policy(SET CMP0015 NEW) + +# CMakeLists for src directory +PROJECT(COS_CPP_SDK) + +if (OPENSSL_VERSION VERSION_LESS 1.1.0) + message("old openssl version less than 1.1.0") + set(COSSDK_SOURCE_FILES cos_api.cpp cos_config.cpp cos_sys_config.cpp + request/base_req.cpp request/bucket_req.cpp request/object_req.cpp response/base_resp.cpp + response/object_resp.cpp response/bucket_resp.cpp response/service_resp.cpp + op/file_copy_task.cpp op/file_download_task.cpp op/file_upload_task.cpp op/base_op.cpp op/object_op.cpp + op/bucket_op.cpp op/service_op.cpp op/cos_result.cpp util/auth_tool.cpp + util/codec_util.cpp util/file_util.cpp util/http_sender.cpp + util/sha1.cpp util/string_util.cpp trsf/transfer_handler.cpp) +ELSE() + message("new version upper than 1.1.0") + set(COSSDK_SOURCE_FILES cos_api.cpp cos_config.cpp cos_sys_config.cpp + request/base_req.cpp request/bucket_req.cpp request/object_req.cpp response/base_resp.cpp + response/object_resp.cpp response/bucket_resp.cpp response/service_resp.cpp + op/file_copy_task.cpp op/file_download_task.cpp op/file_upload_task.cpp op/base_op.cpp op/object_op.cpp + op/bucket_op.cpp op/service_op.cpp op/cos_result.cpp util/auth_tool.cpp + util/codec_util_high_openssl.cpp util/file_util.cpp util/http_sender.cpp + util/sha1.cpp util/string_util.cpp trsf/transfer_handler.cpp) +ENDIF() + + +add_library(cossdk STATIC ${COSSDK_SOURCE_FILES}) + +# When use the on windows need change the boost library according to the local name +if(WIN32) + target_link_libraries(cossdk PocoFoundation PocoNet PocoNetSSL PocoCrypto PocoUtil PocoJSON PocoXML ssl crypto jsoncpp libboost_system-vc141-mt-x64-1_69 libboost_thread-vc141-mt-x64-1_69) +else() + target_link_libraries(cossdk PocoNetSSL PocoNet PocoCrypto PocoUtil PocoJSON PocoXML PocoFoundation ssl crypto rt stdc++ pthread jsoncpp boost_thread boost_system) +endif() + +set_target_properties(cossdk PROPERTIES OUTPUT_NAME "cossdk") diff --git a/src/cos_api.cpp b/src/cos_api.cpp index 2223e21..4f84d07 100644 --- a/src/cos_api.cpp +++ b/src/cos_api.cpp @@ -1,7 +1,6 @@ #include "cos_api.h" -#include - +#include "boost/thread/lock_guard.hpp" #include "threadpool/boost/threadpool.hpp" #include "Poco/Net/HTTPStreamFactory.h" #include "Poco/Net/HTTPSStreamFactory.h" @@ -15,7 +14,7 @@ namespace qcloud_cos { bool CosAPI::s_init = false; bool CosAPI::s_poco_init = false; int CosAPI::s_cos_obj_num = 0; -SimpleMutex CosAPI::s_init_mutex = SimpleMutex(); + boost::threadpool::pool* g_threadpool = NULL; CosAPI::CosAPI(CosConfig& config) @@ -28,7 +27,7 @@ CosAPI::~CosAPI() { } int CosAPI::CosInit() { - SimpleMutexLocker locker(&s_init_mutex); + boost::lock_guard locker(s_init_mutex); ++s_cos_obj_num; if (!s_init) { if (!s_poco_init) { @@ -46,7 +45,7 @@ int CosAPI::CosInit() { } void CosAPI::CosUInit() { - SimpleMutexLocker locker(&s_init_mutex); + boost::lock_guard locker(s_init_mutex); --s_cos_obj_num; if (s_init && s_cos_obj_num == 0) { if (g_threadpool){ @@ -252,11 +251,43 @@ CosResult CosAPI::CompleteMultiUpload(const CompleteMultiUploadReq& request, return m_object_op.CompleteMultiUpload(request, response); } +void TransferSubmit(ObjectOp* op, const MultiUploadObjectReq& req, + Poco::SharedPtr& handler, + MultiUploadObjectResp* resp) { + if(op){ + op->MultiUploadObject(req, handler, resp); + } + +} + +// Async to transfer +Poco::SharedPtr CosAPI::TransferUploadObject(const MultiUploadObjectReq& request, + MultiUploadObjectResp* response) { + // Create the handler + Poco::SharedPtr handler = CreateUploadHandler(request.GetBucketName(), request.GetObjectName(), + request.GetLocalFilePath()); + // Use the cos's boost thread pool to submit the task + if(g_threadpool) { + g_threadpool->schedule(boost::bind(&TransferSubmit, &m_object_op, request, handler, response)); + }else { + handler->UpdateStatus(TransferStatus::FAILED); + request.TriggerTransferStatusUpdateCallback(handler); + } + // Return the handler outside. + return handler; +} + CosResult CosAPI::MultiUploadObject(const MultiUploadObjectReq& request, MultiUploadObjectResp* response) { + return m_object_op.MultiUploadObject(request, response); } +Poco::SharedPtr CosAPI::CreateUploadHandler(const std::string& bucket_name, const std::string& object_name, + const std::string& local_path) { + return m_object_op.CreateUploadHandler(bucket_name, object_name, local_path); +} + CosResult CosAPI::AbortMultiUpload(const AbortMultiUploadReq& request, AbortMultiUploadResp* response) { return m_object_op.AbortMultiUpload(request, response); diff --git a/src/cos_config.cpp b/src/cos_config.cpp index d5d84cf..065a0dc 100644 --- a/src/cos_config.cpp +++ b/src/cos_config.cpp @@ -130,15 +130,13 @@ uint64_t CosConfig::GetAppId() const { } std::string CosConfig::GetAccessKey() const { - SimpleRLocker lock(m_lock); - std::string ak = m_access_key; - return ak; + boost::shared_lock lock(m_lock); + return m_access_key; } std::string CosConfig::GetSecretKey() const { - SimpleRLocker lock(m_lock); - std::string sk = m_secret_key; - return sk; + boost::shared_lock lock(m_lock); + return m_secret_key; } std::string CosConfig::GetRegion() const { @@ -146,13 +144,15 @@ std::string CosConfig::GetRegion() const { } std::string CosConfig::GetTmpToken() const { - SimpleRLocker lock(m_lock); - std::string token = m_tmp_token; - return token; + boost::shared_lock lock(m_lock); + return m_tmp_token; } void CosConfig::SetConfigCredentail(const std::string& access_key, const std::string& secret_key, const std::string& tmp_token) { - SimpleWLocker lock(m_lock); + // get upgradable access + boost::upgrade_lock lock(m_lock); + // get exclusive access + boost::upgrade_to_unique_lock uniqueLock(lock); m_access_key = access_key; m_secret_key = secret_key; m_tmp_token = tmp_token; diff --git a/src/op/cos_result.cpp b/src/op/cos_result.cpp index a6dd3d7..aac6a20 100644 --- a/src/op/cos_result.cpp +++ b/src/op/cos_result.cpp @@ -41,7 +41,7 @@ bool CosResult::ParseFromHttpResponse(const std::map& if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_INFO("Parse string to xml doc error, xml_body=%s", body.c_str()); SetErrorMsg(body); - delete cstr; + delete[] cstr; return false; } @@ -49,7 +49,7 @@ bool CosResult::ParseFromHttpResponse(const std::map& if (NULL == root) { SDK_LOG_INFO("Miss root node=Error, xml_body=%s", body.c_str()); SetErrorMsg(body); - delete cstr; + delete[] cstr; return false; } @@ -71,7 +71,7 @@ bool CosResult::ParseFromHttpResponse(const std::map& node_name.c_str(), body.c_str()); } } - delete cstr; + delete[] cstr; return true; } diff --git a/src/op/file_download_task.cpp b/src/op/file_download_task.cpp index d6b53f8..14163cc 100644 --- a/src/op/file_download_task.cpp +++ b/src/op/file_download_task.cpp @@ -19,7 +19,7 @@ FileDownTask::FileDownTask(const std::string& full_url, m_conn_timeout_in_ms(conn_timeout_in_ms), m_recv_timeout_in_ms(recv_timeout_in_ms), m_offset(offset), m_data_buf_ptr(pbuf), - m_data_len(data_len), m_resp(""), m_is_task_success(false), m_real_down_len(0) { + m_data_len(data_len), m_resp(""), m_is_task_success(false), m_real_down_len(0), m_target_size(0) { } void FileDownTask::Run() { @@ -28,10 +28,11 @@ void FileDownTask::Run() { DownTask(); } -void FileDownTask::SetDownParams(unsigned char* pbuf, size_t data_len, uint64_t offset) { +void FileDownTask::SetDownParams(unsigned char* pbuf, size_t data_len, uint64_t offset, uint64_t target_size) { m_data_buf_ptr = pbuf; m_data_len = data_len; m_offset = offset; + m_target_size = target_size; } size_t FileDownTask::GetDownLoadLen() { @@ -57,7 +58,7 @@ std::map FileDownTask::GetRespHeaders() { void FileDownTask::DownTask() { char range_head[128]; memset(range_head, 0, sizeof(range_head)); - snprintf(range_head, sizeof(range_head), "bytes=%lu-%lu", + snprintf(range_head, sizeof(range_head), "bytes=%llu-%llu", m_offset, (m_offset + m_data_len - 1)); // 增加Range头域,避免大文件时将整个文件下载 @@ -80,6 +81,19 @@ void FileDownTask::DownTask() { size_t len = MIN(m_resp.length(), buf_max_size); memcpy(m_data_buf_ptr, m_resp.c_str(), len); m_real_down_len = len; + + // Must notice the receive timeout in Poco socket in SendRequest() + // The receiveResponse does not throw the expection of timeout, + // so there check the each part size outside, in case of the file incomplete. + if (len != m_target_size) { + SDK_LOG_ERR("FileDownload: url(%s) fail, might be reseted connection, offset:%lld, received:%d, expected: %lld", + m_full_url.c_str(), m_offset, len, m_target_size); + m_is_task_success = false; + // Clear the resp + m_resp = ""; + return; + } + m_is_task_success = true; m_resp = ""; return; diff --git a/src/op/file_upload_task.cpp b/src/op/file_upload_task.cpp index 5055b4a..0629d46 100644 --- a/src/op/file_upload_task.cpp +++ b/src/op/file_upload_task.cpp @@ -19,9 +19,9 @@ FileUploadTask::FileUploadTask(const std::string& full_url, uint64_t recv_timeout_in_ms, unsigned char* pbuf, const size_t data_len) - : m_full_url(full_url), m_data_buf_ptr(pbuf), m_data_len(data_len), + : m_req(NULL), m_full_url(full_url), m_data_buf_ptr(pbuf), m_data_len(data_len), m_conn_timeout_in_ms(conn_timeout_in_ms), m_recv_timeout_in_ms(recv_timeout_in_ms), - m_resp(""), m_is_task_success(false) { + m_resp(""), m_is_task_success(false), m_is_resume(false), m_is_handler(false) { } FileUploadTask::FileUploadTask(const std::string& full_url, @@ -31,9 +31,10 @@ FileUploadTask::FileUploadTask(const std::string& full_url, uint64_t recv_timeout_in_ms, unsigned char* pbuf, const size_t data_len) - : m_full_url(full_url), m_headers(headers), m_params(params), + : m_req(NULL), m_full_url(full_url), m_headers(headers), m_params(params), m_conn_timeout_in_ms(conn_timeout_in_ms), m_recv_timeout_in_ms(recv_timeout_in_ms), - m_data_buf_ptr(pbuf), m_data_len(data_len), m_resp(""), m_is_task_success(false) { + m_data_buf_ptr(pbuf), m_data_len(data_len), m_resp(""), m_is_task_success(false), + m_is_resume(false), m_is_handler(false) { } void FileUploadTask::Run() { @@ -89,9 +90,16 @@ void FileUploadTask::UploadTask() { loop++; m_resp_headers.clear(); m_resp = ""; - m_http_status = HttpSender::SendRequest("PUT", m_full_url, m_params, m_headers, - body, m_conn_timeout_in_ms, m_recv_timeout_in_ms, - &m_resp_headers, &m_resp, &m_err_msg); + + if(IsHandler()) { + m_http_status = HttpSender::SendRequest(m_req, "PUT", m_full_url, m_params, m_headers, + body, m_handler, m_conn_timeout_in_ms, m_recv_timeout_in_ms, + &m_resp_headers, &m_resp, &m_err_msg); + }else { + m_http_status = HttpSender::SendRequest("PUT", m_full_url, m_params, m_headers, + body, m_conn_timeout_in_ms, m_recv_timeout_in_ms, + &m_resp_headers, &m_resp, &m_err_msg); + } if (m_http_status != 200) { SDK_LOG_ERR("FileUpload: url(%s) fail, httpcode:%d, resp: %s", diff --git a/src/op/object_op.cpp b/src/op/object_op.cpp index 7a9bd3f..84f3d35 100644 --- a/src/op/object_op.cpp +++ b/src/op/object_op.cpp @@ -6,6 +6,7 @@ // Description: #include "op/object_op.h" +#include "op/bucket_op.h" #include #include @@ -15,6 +16,9 @@ #include "threadpool/boost/threadpool.hpp" #include +#if defined(_WIN32) +#include +#endif #include "cos_sys_config.h" #include "op/file_copy_task.h" @@ -25,10 +29,17 @@ #include "util/http_sender.h" #include "util/string_util.h" +#include "request/bucket_req.h" +#include "response/bucket_resp.h" + #include "Poco/MD5Engine.h" #include "Poco/DigestStream.h" #include "Poco/StreamCopier.h" +#if defined(_WIN32) +#define lseek _lseeki64 +#endif + namespace qcloud_cos { bool ObjectOp::IsObjectExist(const std::string& bucket_name, const std::string& object_name) { @@ -42,6 +53,143 @@ bool ObjectOp::IsObjectExist(const std::string& bucket_name, const std::string& return false; } +std::string ObjectOp::GetResumableUploadID(const std::string& bucket_name, const std::string& object_name) { + ListMultipartUploadReq req(bucket_name); + req.SetPrefix(object_name); + ListMultipartUploadResp resp; + + std::string host = CosSysConfig::GetHost(GetAppId(), m_config->GetRegion(), + req.GetBucketName()); + std::string path = req.GetPath(); + CosResult result = NormalAction(host, path, req, "", false, &resp); + + std::vector rst = resp.GetUpload(); + // Notice the index type, if size_t might over + int index = rst.size() - 1; + while (index >= 0) { + if (rst[index].m_key == object_name) { + return rst[index].m_uploadid; + } + index--; + } + return ""; +} + +bool ObjectOp::check_single_part(const std::string& local_file_path, uint64_t offset, uint64_t local_part_size, + uint64_t size, const std::string& etag) { + if (local_part_size != size) { + return false; + } + + std::ifstream fin(local_file_path.c_str(), std::ios::in | std::ios::binary); + if (!fin.is_open()) { + SDK_LOG_ERR("CheckUploadPart: file open fail, %s", local_file_path.c_str()); + return false; + } + + fin.seekg (offset); + + // Allocate memory: + char *data = new char[local_part_size]; + + // Read data as a block: + fin.read (data,local_part_size); + + fin.seekg (0, fin.beg); + fin.close(); + + // Print content: + std::istringstream stringStream(std::string(data, local_part_size)); + + Poco::MD5Engine md5; + Poco::DigestOutputStream dos(md5); + Poco::StreamCopier::copyStream(stringStream, dos); + + dos.flush(); + + std::string md5_str = Poco::DigestEngine::digestToHex(md5.digest()); + + delete []data; + dos.close(); + + if (md5_str != etag) { + return false; + } + return true; +} + +bool ObjectOp::CheckUploadPart(const MultiUploadObjectReq& req, const std::string& bucket_name, + const std::string& object_name, const std::string& uploadid, + const std::string& localpath, std::vector& already_exist) { + // Count the size info + std::string local_file_path = req.GetLocalFilePath(); + std::ifstream fin(local_file_path.c_str(), std::ios::in | std::ios::binary); + if (!fin.is_open()){ + SDK_LOG_ERR("CheckUploadPart: file open fail, %s", local_file_path.c_str()); + return false; + } + uint64_t file_size = FileUtil::GetFileLen(local_file_path); + uint64_t part_size = req.GetPartSize(); + uint64_t part_num = file_size / part_size; + uint64_t last_part_size = file_size % part_size; + + if (0 != last_part_size) { + part_num += 1; + } else { + last_part_size = part_size; + } + if (part_num > kMaxPartNumbers) { + return false; + } + + ListPartsReq list_req(bucket_name, object_name, uploadid); + ListPartsResp resp; + int part_num_marker = 0; + bool list_over_flag = false; + + std::vector parts_info; + + while (!list_over_flag) { + std::string marker = StringUtil::IntToString(part_num_marker); + list_req.SetPartNumberMarker(marker); + CosResult result = ListParts(list_req, &resp); + // Add to the parts_info; + std::vector rst = resp.GetParts(); + for (std::vector::const_iterator itr = rst.begin(); itr != rst.end(); ++itr) { + parts_info.push_back(*itr); + } + + if (!resp.IsTruncated()) { + list_over_flag = true; + }else { + part_num_marker = int(resp.GetNextPartNumberMarker()); + } + } + + for (std::vector::const_iterator itr = parts_info.begin(); itr != parts_info.end(); ++itr) { + uint64_t sev_part_num = itr->m_part_num; + if (sev_part_num > part_num) { + return false; + } + uint64_t offset = (sev_part_num - 1) * part_size; + uint64_t local_part_size = part_size; + if (sev_part_num == part_num) { + local_part_size = last_part_size; + } + + // Check single upload part each md5 + std::string etag = itr->m_etag; + if (!check_single_part(local_file_path, offset, local_part_size, itr->m_size, etag)) { + return false; + } + + // Add the part_num with etags in already exist + already_exist[sev_part_num] = itr->m_etag; + } + + return true; +} + CosResult ObjectOp::HeadObject(const HeadObjectReq& req, HeadObjectResp* resp) { std::string host = CosSysConfig::GetHost(GetAppId(), m_config->GetRegion(), req.GetBucketName()); @@ -208,6 +356,8 @@ CosResult ObjectOp::DeleteObjects(const DeleteObjectsReq& req, DeleteObjectsResp additional_params, req_body, false, resp); } + +// Origin call CosResult ObjectOp::MultiUploadObject(const MultiUploadObjectReq& req, MultiUploadObjectResp* resp) { CosResult result; @@ -216,74 +366,217 @@ CosResult ObjectOp::MultiUploadObject(const MultiUploadObjectReq& req, std::string object_name = req.GetObjectName(); std::string local_file_path = req.GetLocalFilePath(); - std::ifstream fin(local_file_path.c_str() , std::ios::in); - if (!fin) { - result.SetErrorInfo("Open local file fail, local file=" + local_file_path); - return result; + bool resume_flag = false; + // There is mem or cpu problem, if use the red-black tree might be slow + std::vector already_exist_parts(kMaxPartNumbers); + // check the breakpoint + std::string resume_uploadid = GetResumableUploadID(bucket_name, object_name); + if (!resume_uploadid.empty()) { + resume_flag = CheckUploadPart(req, bucket_name, object_name, resume_uploadid, + local_file_path, already_exist_parts); } - // 1. Init - InitMultiUploadReq init_req(bucket_name, object_name); - const std::string& server_side_encryption = req.GetHeader("x-cos-server-side-encryption"); - if (!server_side_encryption.empty()) { - init_req.SetXCosServerSideEncryption(server_side_encryption); - } + if (!resume_flag) { + // 1. Init + InitMultiUploadReq init_req(bucket_name, object_name); + const std::string& server_side_encryption = req.GetHeader("x-cos-server-side-encryption"); + if (!server_side_encryption.empty()) { + init_req.SetXCosServerSideEncryption(server_side_encryption); + } - if (req.IsSetXCosMeta()) { - const std::map xcos_meta = req.GetXCosMeta(); - std::map::const_iterator iter = xcos_meta.begin(); - for(; iter != xcos_meta.end(); iter++) { - init_req.SetXCosMeta(iter->first, iter->second); + if (req.IsSetXCosMeta()) { + const std::map xcos_meta = req.GetXCosMeta(); + std::map::const_iterator iter = xcos_meta.begin(); + for(; iter != xcos_meta.end(); iter++) { + init_req.SetXCosMeta(iter->first, iter->second); + } + } + + InitMultiUploadResp init_resp; + init_req.SetConnTimeoutInms(req.GetConnTimeoutInms()); + init_req.SetRecvTimeoutInms(req.GetRecvTimeoutInms()); + result = InitMultiUpload(init_req, &init_resp); + if (!result.IsSucc()) { + SDK_LOG_ERR("Multi upload object fail, check init mutli result."); + resp->CopyFrom(init_resp); + return result; + } + resume_uploadid = init_resp.GetUploadId(); + if (resume_uploadid.empty()) { + SDK_LOG_ERR("Multi upload object fail, upload id is empty."); + resp->CopyFrom(init_resp); + return result; } } - InitMultiUploadResp init_resp; - init_req.SetConnTimeoutInms(req.GetConnTimeoutInms()); - init_req.SetRecvTimeoutInms(req.GetRecvTimeoutInms()); - result = InitMultiUpload(init_req, &init_resp); + // 2. Multi Upload + std::vector etags; + std::vector part_numbers; + // TODO(返回值判断), add the already exist parts + result = MultiThreadUpload(req, resume_uploadid, already_exist_parts, + resume_flag, &etags, &part_numbers); if (!result.IsSucc()) { - SDK_LOG_ERR("Multi upload object fail, check init mutli result."); - resp->CopyFrom(init_resp); + SDK_LOG_ERR("Multi upload object fail, check upload mutli result."); + // Copy失败则需要Abort + AbortMultiUploadReq abort_req(req.GetBucketName(), + req.GetObjectName(), resume_uploadid); + AbortMultiUploadResp abort_resp; + + CosResult abort_result = AbortMultiUpload(abort_req, &abort_resp); + if (!abort_result.IsSucc()) { + SDK_LOG_ERR("Upload failed, and abort muliti upload also failed" + ", resume_uploadid=%s", resume_uploadid.c_str()); + return abort_result; + } return result; } - std::string upload_id = init_resp.GetUploadId(); - if (upload_id.empty()) { - SDK_LOG_ERR("Multi upload object fail, upload id is empty."); - resp->CopyFrom(init_resp); - return result; + + // 3. Complete + CompleteMultiUploadReq comp_req(bucket_name, object_name, resume_uploadid); + CompleteMultiUploadResp comp_resp; + comp_req.SetConnTimeoutInms(req.GetConnTimeoutInms()); + comp_req.SetRecvTimeoutInms(req.GetRecvTimeoutInms() * 2); // Complete的超时翻倍 + comp_req.SetEtags(etags); + comp_req.SetPartNumbers(part_numbers); + + result = CompleteMultiUpload(comp_req, &comp_resp); + resp->CopyFrom(comp_resp); + + return result; +} + +// Create the handler +Poco::SharedPtr ObjectOp::CreateUploadHandler(const std::string& bucket_name, const std::string& object_name, + const std::string& local_path) { + TransferHandler *p = new TransferHandler(bucket_name, object_name, 0, local_path); + Poco::SharedPtr handler(p); + + uint64_t file_size = FileUtil::GetFileLen(local_path); + + handler->SetTotalSize(file_size); + return handler; +} + +// Transfer call +// Be careful with the order to call update status, the resp is outside pointer, +// when outside resp is the temp param it will release lead the opreation core. +CosResult ObjectOp::MultiUploadObject(const MultiUploadObjectReq& req, + Poco::SharedPtr& handler, + MultiUploadObjectResp* resp) { + CosResult result; + uint64_t app_id = GetAppId(); + std::string bucket_name = req.GetBucketName(); + std::string object_name = req.GetObjectName(); + std::string local_file_path = req.GetLocalFilePath(); + + bool resume_flag = false; + // There is mem or cpu problem, if use the red-black tree might be slow + std::vector already_exist_parts(kMaxPartNumbers); + // check the breakpoint + std::string resume_uploadid = GetResumableUploadID(bucket_name, object_name); + if (!resume_uploadid.empty()) { + resume_flag = CheckUploadPart(req, bucket_name, object_name, resume_uploadid, + local_file_path, already_exist_parts); } + if (!resume_flag) { + // 1. Init + InitMultiUploadReq init_req(bucket_name, object_name); + const std::string& server_side_encryption = req.GetHeader("x-cos-server-side-encryption"); + if (!server_side_encryption.empty()) { + init_req.SetXCosServerSideEncryption(server_side_encryption); + } + + if (req.IsSetXCosMeta()) { + const std::map xcos_meta = req.GetXCosMeta(); + std::map::const_iterator iter = xcos_meta.begin(); + for(; iter != xcos_meta.end(); iter++) { + init_req.SetXCosMeta(iter->first, iter->second); + } + } + + InitMultiUploadResp init_resp; + init_req.SetConnTimeoutInms(req.GetConnTimeoutInms()); + init_req.SetRecvTimeoutInms(req.GetRecvTimeoutInms()); + result = InitMultiUpload(init_req, &init_resp); + if (!result.IsSucc()) { + SDK_LOG_ERR("Multi upload object fail, check init mutli result."); + resp->CopyFrom(init_resp); + handler->m_result = result; + handler->UpdateStatus(TransferStatus::FAILED); + req.TriggerTransferStatusUpdateCallback(handler); + return result; + } + resume_uploadid = init_resp.GetUploadId(); + if (resume_uploadid.empty()) { + SDK_LOG_ERR("Multi upload object fail, upload id is empty."); + resp->CopyFrom(init_resp); + handler->m_result = result; + handler->UpdateStatus(TransferStatus::FAILED); + req.TriggerTransferStatusUpdateCallback(handler); + return result; + } + } + SDK_LOG_INFO("Multi upload object handler way id:%s, resumed:%d", resume_uploadid.c_str(), resume_flag); + // 2. Multi Upload std::vector etags; std::vector part_numbers; - // TODO(返回值判断) - result = MultiThreadUpload(req, upload_id, &etags, &part_numbers); + // Add the already exist parts + handler->SetUploadID(resume_uploadid); + handler->UpdateStatus(TransferStatus::IN_PROGRESS); + + result = MultiThreadUpload(req, resume_uploadid, already_exist_parts, handler, + resume_flag, &etags, &part_numbers); + // Cancel way + if (!handler->ShouldContinue()) { + SDK_LOG_INFO("Multi upload object, canceled"); + handler->UpdateStatus(TransferStatus::CANCELED); + req.TriggerTransferStatusUpdateCallback(handler); + return result; + } + + // Notice the cancel way not need to abort the uploadid if (!result.IsSucc()) { SDK_LOG_ERR("Multi upload object fail, check upload mutli result."); - // Copy失败则需要Abort + // When copy failed need abort. AbortMultiUploadReq abort_req(req.GetBucketName(), - req.GetObjectName(), upload_id); + req.GetObjectName(), resume_uploadid); AbortMultiUploadResp abort_resp; CosResult abort_result = AbortMultiUpload(abort_req, &abort_resp); if (!abort_result.IsSucc()) { SDK_LOG_ERR("Upload failed, and abort muliti upload also failed" - ", upload_id=%s", upload_id.c_str()); + ", resume_uploadid=%s", resume_uploadid.c_str()); + handler->m_result = abort_result; + handler->UpdateStatus(TransferStatus::FAILED); + req.TriggerTransferStatusUpdateCallback(handler); return abort_result; } + handler->m_result = result; + handler->UpdateStatus(TransferStatus::ABORTED); + req.TriggerTransferStatusUpdateCallback(handler); return result; } // 3. Complete - CompleteMultiUploadReq comp_req(bucket_name, object_name, upload_id); + CompleteMultiUploadReq comp_req(bucket_name, object_name, resume_uploadid); CompleteMultiUploadResp comp_resp; comp_req.SetConnTimeoutInms(req.GetConnTimeoutInms()); - comp_req.SetRecvTimeoutInms(req.GetRecvTimeoutInms() * 2); // Complete的超时翻倍 + // Double timeout time + comp_req.SetRecvTimeoutInms(req.GetRecvTimeoutInms() * 2); comp_req.SetEtags(etags); comp_req.SetPartNumbers(part_numbers); result = CompleteMultiUpload(comp_req, &comp_resp); resp->CopyFrom(comp_resp); + handler->m_result = result; + if (!result.IsSucc()) { + handler->UpdateStatus(TransferStatus::FAILED); + }else { + handler->UpdateStatus(TransferStatus::COMPLETED); + } + req.TriggerTransferStatusUpdateCallback(handler); return result; } @@ -515,7 +808,7 @@ CosResult ObjectOp::Copy(const CopyReq& req, CopyResp* resp) { // 源文件小于5G则采用PutObjectCopy进行复制 if (file_size < kPartSize5G || src_region == m_config->GetRegion()) { - SDK_LOG_INFO("File Size=%ld less than 5G, use put object copy.", file_size); + SDK_LOG_INFO("File Size=%lld less than 5G, use put object copy.", file_size); PutObjectCopyReq put_copy_req(req.GetBucketName(), req.GetObjectName()); put_copy_req.AddHeaders(req.GetHeaders()); PutObjectCopyResp put_copy_resp; @@ -528,7 +821,7 @@ CosResult ObjectOp::Copy(const CopyReq& req, CopyResp* resp) { } return result; } else if (file_size < req.GetPartSize() * 10000) { - SDK_LOG_INFO("File Size=%ld bigger than 5G, use put object copy.", file_size); + SDK_LOG_INFO("File Size=%lld bigger than 5G, use put object copy.", file_size); // 1. InitMultiUploadReq InitMultiUploadReq init_req(req.GetBucketName(), req.GetObjectName()); InitMultiUploadResp init_resp; @@ -574,7 +867,7 @@ CosResult ObjectOp::Copy(const CopyReq& req, CopyResp* resp) { if (end >= file_size) { end = file_size - 1; } - SDK_LOG_DBG("copy data, task_index=%d, file_size=%lu, offset=%lu, end=%lu", + SDK_LOG_DBG("copy data, task_index=%d, file_size=%llu, offset=%llu, end=%llu", task_index, file_size, offset, end); std::string range = "bytes=" + StringUtil::Uint64ToString(offset) + "-" + StringUtil::Uint64ToString(end); @@ -650,7 +943,7 @@ CosResult ObjectOp::Copy(const CopyReq& req, CopyResp* resp) { return result; } else { SDK_LOG_ERR("Source Object is too large or your upload copy part size in config" - "is too small, src obj size=%ld, copy_part_size=%ld", + "is too small, src obj size=%lld, copy_part_size=%lld", file_size, CosSysConfig::GetUploadCopyPartSize()); result.SetErrorInfo("Could not copy object, because of object size is too large " "or part size is too small."); @@ -717,8 +1010,14 @@ CosResult ObjectOp::MultiThreadDownload(const MultiGetObjectReq& req, MultiGetOb // 3. 打开本地文件 std::string local_path = req.GetLocalFilePath(); - int fd = open(local_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, - S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); +#if defined(_WIN32) + // The _O_BINARY is need by windows otherwise the x0A might change into x0D x0A + int fd = _open(local_path.c_str(), _O_BINARY | O_WRONLY | O_CREAT | O_TRUNC, + _S_IREAD | _S_IWRITE); +#else + int fd = open(local_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, + S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); +#endif if (-1 == fd) { std::string err_info = "open file(" + local_path + ") fail, errno=" + StringUtil::IntToString(errno); @@ -747,7 +1046,7 @@ CosResult ObjectOp::MultiThreadDownload(const MultiGetObjectReq& req, MultiGetOb req.GetConnTimeoutInms(), req.GetRecvTimeoutInms()); } - SDK_LOG_DBG("download data,url=%s, poolsize=%u,slice_size=%u,file_size=%lu", + SDK_LOG_DBG("download data,url=%s, poolsize=%u,slice_size=%u,file_size=%llu", dest_url.c_str(), pool_size, slice_size, file_size); std::vector vec_offset; @@ -758,15 +1057,21 @@ CosResult ObjectOp::MultiThreadDownload(const MultiGetObjectReq& req, MultiGetOb unsigned down_times = 0; bool is_header_set = false; while(offset < file_size) { - SDK_LOG_DBG("down data, offset=%lu, file_size=%lu", offset, file_size); + SDK_LOG_DBG("down data, offset=%llu, file_size=%llu", offset, file_size); unsigned task_index = 0; vec_offset.clear(); for (; task_index < pool_size && (offset < file_size); ++task_index) { - SDK_LOG_DBG("down data, task_index=%d, file_size=%lu, offset=%lu", + SDK_LOG_DBG("down data, task_index=%d, file_size=%llu, offset=%llu", task_index, file_size, offset); FileDownTask* ptask = pptaskArr[task_index]; - - ptask->SetDownParams(file_content_buf[task_index], slice_size, offset); + uint64_t target_size = 0; + if (file_size - offset < slice_size) { + target_size = file_size - offset; + } else { + target_size = slice_size; + } + + ptask->SetDownParams(file_content_buf[task_index], slice_size, offset, target_size); tp.schedule(boost::bind(&FileDownTask::Run, ptask)); vec_offset[task_index] = offset; offset += slice_size; @@ -795,6 +1100,9 @@ CosResult ObjectOp::MultiThreadDownload(const MultiGetObjectReq& req, MultiGetOb task_fail_flag = true; break; } else { + // Notice the lseek when used in 32bit plat has the biggest offset limit 2G + // It fs can not seek over it and return NO_VAL(errno = 22 in windows), + // There use the _lseeki64() instead in windows and the * in linux. if (-1 == lseek(fd, vec_offset[task_index], SEEK_SET)) { std::string err_info = "down data, lseek ret=" + StringUtil::IntToString(errno) + ", offset=" @@ -805,7 +1113,12 @@ CosResult ObjectOp::MultiThreadDownload(const MultiGetObjectReq& req, MultiGetOb break; } +#if defined(_WIN32) + if (-1 == _write(fd, file_content_buf[task_index], ptask->GetDownLoadLen())) { + +#else if (-1 == write(fd, file_content_buf[task_index], ptask->GetDownLoadLen())) { +#endif std::string err_info = "down data, write ret=" + StringUtil::IntToString(errno) + ", len=" + StringUtil::Uint64ToString(ptask->GetDownLoadLen()); @@ -819,14 +1132,16 @@ CosResult ObjectOp::MultiThreadDownload(const MultiGetObjectReq& req, MultiGetOb resp->ParseFromHeaders(ptask->GetRespHeaders()); is_header_set = true; } - SDK_LOG_DBG("down data, down_times=%u,task_index=%d, file_size=%lu, " - "offset=%lu, downlen:%lu ", + SDK_LOG_DBG("down data, down_times=%u,task_index=%d, file_size=%llu, " + "offset=%llu, downlen:%zu ", down_times,task_index, file_size, vec_offset[task_index], ptask->GetDownLoadLen()); } } if (task_fail_flag) { + // The result reused need to set status last + result.SetFail(); break; } } @@ -838,8 +1153,15 @@ CosResult ObjectOp::MultiThreadDownload(const MultiGetObjectReq& req, MultiGetOb resp->SetEtag(head_resp.GetEtag()); } - // 4. 释放所有资源 + // Release resource + // The _close must be with the _open _write api in windows, otherwise there will occure the wrong content. + // Keep testing. Complete me. +#if defined(_WIN32) + _close(fd); +#else close(fd); +#endif + for(unsigned i = 0; i < pool_size; i++){ delete [] file_content_buf[i]; delete pptaskArr[i]; @@ -852,9 +1174,172 @@ CosResult ObjectOp::MultiThreadDownload(const MultiGetObjectReq& req, MultiGetOb return result; } -// TODO(sevenyou) 多线程上传, 返回的resp内容需要再斟酌下. +// Origin way +CosResult ObjectOp::MultiThreadUpload(const MultiUploadObjectReq& req, + const std::string& upload_id, + const std::vector& already_exist_parts, + bool resume_flag, + std::vector* etags_ptr, + std::vector* part_numbers_ptr) { + CosResult result; + std::string path = "/" + req.GetObjectName(); + std::string host = CosSysConfig::GetHost(GetAppId(), m_config->GetRegion(), + req.GetBucketName()); + + // 1. 获取文件大小 + std::string local_file_path = req.GetLocalFilePath(); + std::ifstream fin(local_file_path.c_str(), std::ios::in | std::ios::binary); + if (!fin.is_open()){ + SDK_LOG_ERR("FileUploadSliceData: file open fail, %s", local_file_path.c_str()); + result.SetErrorInfo("local file not exist, local_file=" + local_file_path); + return result; + } + uint64_t file_size = FileUtil::GetFileLen(local_file_path); + + // 2. 初始化upload task + uint64_t offset = 0; + bool task_fail_flag = false; + + uint64_t part_size = req.GetPartSize(); + int pool_size = req.GetThreadPoolSize(); + + // Check the part number + uint64_t part_number = file_size / part_size; + uint64_t last_part_size = file_size % part_size; + if (0 != last_part_size) { + part_number += 1; + }else { + last_part_size = part_size; // for now not use this. + } + + if (part_number > kMaxPartNumbers) { + SDK_LOG_ERR("FileUploadSliceData: part number bigger than 10000, %lld", part_number); + result.SetErrorInfo("part number bigger than 10000"); + return result; + } + + + unsigned char** file_content_buf = new unsigned char*[pool_size]; + for(int i = 0; i < pool_size; ++i) { + file_content_buf[i] = new unsigned char[part_size]; + } + + std::string dest_url = GetRealUrl(host, path, req.IsHttps()); + FileUploadTask** pptaskArr = new FileUploadTask*[pool_size]; + for (int i = 0; i < pool_size; ++i) { + pptaskArr[i] = new FileUploadTask(dest_url, req.GetConnTimeoutInms(), req.GetRecvTimeoutInms()); + } + + SDK_LOG_DBG("upload data,url=%s, poolsize=%u, part_size=%llu, file_size=%llu", + dest_url.c_str(), pool_size, part_size, file_size); + + boost::threadpool::pool tp(pool_size); + + // 3. 多线程upload + { + uint64_t part_number = 1; + while (offset < file_size) { + int task_index = 0; + for (; task_index < pool_size; ++task_index) { + fin.read((char *)file_content_buf[task_index], part_size); + size_t read_len = fin.gcount(); + if (read_len == 0 && fin.eof()) { + SDK_LOG_DBG("read over, task_index: %d", task_index); + break; + } + + SDK_LOG_DBG("upload data, task_index=%d, file_size=%llu, offset=%llu, len=%zu", + task_index, file_size, offset, read_len); + + // Check the resume + FileUploadTask* ptask = pptaskArr[task_index]; + if (resume_flag && !already_exist_parts[part_number].empty()) { + // Already has this part + ptask->SetResume(resume_flag); + ptask->SetResumeEtag(already_exist_parts[part_number]); + std::cout << "task etag is" << already_exist_parts[part_number] << std::endl; + ptask->SetTaskSuccess(); + SDK_LOG_INFO("upload data part:%lld has resumed", part_number); + + }else { + FillUploadTask(upload_id, host, path, file_content_buf[task_index], read_len, + part_number, ptask); + tp.schedule(boost::bind(&FileUploadTask::Run, ptask)); + } + + offset += read_len; + part_numbers_ptr->push_back(part_number); + ++part_number; + } + + int max_task_num = task_index; + + tp.wait(); + for (task_index = 0; task_index < max_task_num; ++task_index) { + FileUploadTask* ptask = pptaskArr[task_index]; + if (!ptask->IsTaskSuccess()) { + const std::string& task_resp = ptask->GetTaskResp(); + const std::map& task_resp_headers = ptask->GetRespHeaders(); + SDK_LOG_ERR("upload data, upload task fail, rsp:%s", task_resp.c_str()); + result.SetHttpStatus(ptask->GetHttpStatus()); + if (ptask->GetHttpStatus() == -1) { + result.SetErrorInfo(ptask->GetErrMsg()); + } else if (!result.ParseFromHttpResponse(task_resp_headers, task_resp)) { + result.SetErrorInfo(task_resp); + } + + task_fail_flag = true; + break; + } + + // 找不到etag也算失败 + const std::map& resp_header = ptask->GetRespHeaders(); + std::map::const_iterator itr = resp_header.find("ETag"); + if (itr != resp_header.end()) { + etags_ptr->push_back(itr->second); + } else if (ptask->IsResume() && !ptask->GetResumeEtag().empty()) { + etags_ptr->push_back(ptask->GetResumeEtag()); + } else { + std::string err_info = "upload data, upload task succ, " + "but response header missing etag field."; + SDK_LOG_ERR("%s", err_info.c_str()); + result.SetHttpStatus(ptask->GetHttpStatus()); + task_fail_flag = true; + break; + } + } + + if (task_fail_flag) { + break; + } + } + } + + if (!task_fail_flag) { + result.SetSucc(); + } + + // 释放相关资源 + fin.close(); + for (int i = 0; i< pool_size; ++i) { + delete pptaskArr[i]; + } + delete [] pptaskArr; + + for (int i = 0; i < pool_size; ++i) { + delete [] file_content_buf[i]; + } + delete [] file_content_buf; + + return result; +} + +// Trsf way CosResult ObjectOp::MultiThreadUpload(const MultiUploadObjectReq& req, const std::string& upload_id, + const std::vector& already_exist_parts, + Poco::SharedPtr& handler, + bool resume_flag, std::vector* etags_ptr, std::vector* part_numbers_ptr) { CosResult result; @@ -878,6 +1363,23 @@ CosResult ObjectOp::MultiThreadUpload(const MultiUploadObjectReq& req, uint64_t part_size = req.GetPartSize(); int pool_size = req.GetThreadPoolSize(); + + // Check the part number + uint64_t part_number = file_size / part_size; + uint64_t last_part_size = file_size % part_size; + if (0 != last_part_size) { + part_number += 1; + }else { + last_part_size = part_size; // for now not use this. + } + + if (part_number > kMaxPartNumbers) { + SDK_LOG_ERR("FileUploadSliceData: part number bigger than 10000, %lld", part_number); + result.SetErrorInfo("part number bigger than 10000"); + return result; + } + + unsigned char** file_content_buf = new unsigned char*[pool_size]; for(int i = 0; i < pool_size; ++i) { file_content_buf[i] = new unsigned char[part_size]; @@ -889,7 +1391,7 @@ CosResult ObjectOp::MultiThreadUpload(const MultiUploadObjectReq& req, pptaskArr[i] = new FileUploadTask(dest_url, req.GetConnTimeoutInms(), req.GetRecvTimeoutInms()); } - SDK_LOG_DBG("upload data,url=%s, poolsize=%u, part_size=%lu, file_size=%lu", + SDK_LOG_DBG("upload data,url=%s, poolsize=%u, part_size=%llu, file_size=%llu", dest_url.c_str(), pool_size, part_size, file_size); boost::threadpool::pool tp(pool_size); @@ -899,6 +1401,12 @@ CosResult ObjectOp::MultiThreadUpload(const MultiUploadObjectReq& req, uint64_t part_number = 1; while (offset < file_size) { int task_index = 0; + if (!handler->ShouldContinue()) { + task_fail_flag = true; + result.SetErrorInfo("FileUpload handler canceled"); + break; + } + for (; task_index < pool_size; ++task_index) { fin.read((char *)file_content_buf[task_index], part_size); size_t read_len = fin.gcount(); @@ -907,13 +1415,33 @@ CosResult ObjectOp::MultiThreadUpload(const MultiUploadObjectReq& req, break; } - SDK_LOG_DBG("upload data, task_index=%d, file_size=%lu, offset=%lu, len=%lu", + SDK_LOG_DBG("upload data, task_index=%d, file_size=%llu, offset=%llu, len=%zu", task_index, file_size, offset, read_len); + // Check the resume FileUploadTask* ptask = pptaskArr[task_index]; - FillUploadTask(upload_id, host, path, file_content_buf[task_index], read_len, - part_number, ptask); - tp.schedule(boost::bind(&FileUploadTask::Run, ptask)); + + ptask->m_req = const_cast(&req); + ptask->SetHandler(true); + + if (resume_flag && !already_exist_parts[part_number].empty()) { + // Already has this part + ptask->SetResume(resume_flag); + ptask->SetResumeEtag(already_exist_parts[part_number]); + std::cout << "task etag is" << already_exist_parts[part_number] << std::endl; + ptask->SetTaskSuccess(); + SDK_LOG_INFO("upload data part:%lld has resumed", part_number); + + handler->UpdateProgress(read_len); + req.TriggerUploadProgressCallback(handler); + + }else { + ptask->m_handler = handler; + FillUploadTask(upload_id, host, path, file_content_buf[task_index], read_len, + part_number, ptask); + tp.schedule(boost::bind(&FileUploadTask::Run, ptask)); + } + offset += read_len; part_numbers_ptr->push_back(part_number); ++part_number; @@ -944,6 +1472,8 @@ CosResult ObjectOp::MultiThreadUpload(const MultiUploadObjectReq& req, std::map::const_iterator itr = resp_header.find("ETag"); if (itr != resp_header.end()) { etags_ptr->push_back(itr->second); + } else if (ptask->IsResume() && !ptask->GetResumeEtag().empty()) { + etags_ptr->push_back(ptask->GetResumeEtag()); } else { std::string err_info = "upload data, upload task succ, " "but response header missing etag field."; diff --git a/src/request/bucket_req.cpp b/src/request/bucket_req.cpp index 12794a9..f0c3acf 100644 --- a/src/request/bucket_req.cpp +++ b/src/request/bucket_req.cpp @@ -104,7 +104,7 @@ bool PutBucketLifecycleReq::GenerateRequestBody(std::string* body) const { "Filter", NULL); LifecycleFilter filter = rule.GetFilter(); std::vector tags = filter.GetTags(); - int cnt = tags.size(); + size_t cnt = tags.size(); if (filter.HasPrefix()) { ++cnt; } diff --git a/src/response/base_resp.cpp b/src/response/base_resp.cpp index 3f84b15..aafe5ac 100644 --- a/src/response/base_resp.cpp +++ b/src/response/base_resp.cpp @@ -89,14 +89,14 @@ bool BaseResp::ParseFromACLXMLString(const std::string& body, if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("AccessControlPolicy"); if (NULL == root) { SDK_LOG_ERR("Miss root node=AccessControlPolicy, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -165,7 +165,7 @@ bool BaseResp::ParseFromACLXMLString(const std::string& body, } } - delete cstr; + delete[] cstr; return true; } diff --git a/src/response/bucket_resp.cpp b/src/response/bucket_resp.cpp index ee7f932..ff58d49 100644 --- a/src/response/bucket_resp.cpp +++ b/src/response/bucket_resp.cpp @@ -28,14 +28,14 @@ bool GetBucketResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node(kGetBucketRoot.c_str()); if (NULL == root) { SDK_LOG_ERR("Miss root node=kGetBucketRoot, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -98,7 +98,7 @@ bool GetBucketResp::ParseFromXmlString(const std::string& body) { node_name.c_str(), body.c_str()); } } - delete cstr; + delete[] cstr; return true; } @@ -110,14 +110,14 @@ bool ListMultipartUploadResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node(kListMultipartUploadRoot.c_str()); if (NULL == root) { SDK_LOG_ERR("Miss root node=kListMultipartUploadRoot, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -204,7 +204,7 @@ bool ListMultipartUploadResp::ParseFromXmlString(const std::string& body) { node_name.c_str(), body.c_str()); } } - delete cstr; + delete[] cstr; return true; } @@ -216,14 +216,14 @@ bool GetBucketReplicationResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node(kBucketReplicationRoot.c_str()); if (NULL == root) { SDK_LOG_ERR("Miss root node=BucketReplicationRoot, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -272,7 +272,7 @@ bool GetBucketReplicationResp::ParseFromXmlString(const std::string& body) { node_name.c_str(), body.c_str()); } } - delete cstr; + delete[] cstr; return true; } @@ -284,14 +284,14 @@ bool GetBucketLifecycleResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("LifecycleConfiguration"); if (NULL == root) { SDK_LOG_ERR("Miss root node=LifecycleConfiguration, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -455,7 +455,7 @@ bool GetBucketLifecycleResp::ParseFromXmlString(const std::string& body) { } } - delete cstr; + delete[] cstr; return true; } @@ -471,14 +471,14 @@ bool GetBucketCORSResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("CORSConfiguration"); if (NULL == root) { SDK_LOG_ERR("Miss root node=CORSConfiguration, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -514,7 +514,7 @@ bool GetBucketCORSResp::ParseFromXmlString(const std::string& body) { } } - delete cstr; + delete[] cstr; return true; } @@ -526,14 +526,14 @@ bool GetBucketVersioningResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("VersioningConfiguration"); if (NULL == root) { SDK_LOG_ERR("Miss root node=VersioningConfiguration, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -555,7 +555,7 @@ bool GetBucketVersioningResp::ParseFromXmlString(const std::string& body) { node_name.c_str(), body.c_str()); } } - delete cstr; + delete[] cstr; return true; } @@ -567,20 +567,20 @@ bool GetBucketLocationResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("LocationConstraint"); if (NULL == root) { SDK_LOG_ERR("Miss root node=LocationConstraint, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } m_location = root->value(); - delete cstr; + delete[] cstr; return true; } @@ -592,14 +592,14 @@ bool GetBucketObjectVersionsResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("ListVersionsResult"); if (NULL == root) { SDK_LOG_ERR("Miss root node=ListVersionsResult, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -670,7 +670,7 @@ bool GetBucketObjectVersionsResp::ParseFromXmlString(const std::string& body) { } } - delete cstr; + delete[] cstr; return true; } diff --git a/src/response/object_resp.cpp b/src/response/object_resp.cpp index 6044a8c..420dcff 100644 --- a/src/response/object_resp.cpp +++ b/src/response/object_resp.cpp @@ -28,14 +28,14 @@ bool InitMultiUploadResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node(kInitiateMultipartUploadRoot.c_str()); if (NULL == root) { SDK_LOG_ERR("Miss root node=InitiateMultipartUploadResult, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -51,7 +51,7 @@ bool InitMultiUploadResp::ParseFromXmlString(const std::string& body) { } } - delete cstr; + delete[] cstr; return true; } @@ -62,14 +62,14 @@ bool CompleteMultiUploadResp::ParseFromXmlString(const std::string& body) { cstr[body.size()] = '\0'; if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=[%s]", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node(kCompleteMultiUploadRoot.c_str()); if (NULL == root) { SDK_LOG_ERR("Miss root node=ListBucketsResult, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -87,7 +87,7 @@ bool CompleteMultiUploadResp::ParseFromXmlString(const std::string& body) { } } - delete cstr; + delete[] cstr; return true; } @@ -158,14 +158,14 @@ bool ListPartsResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("ListPartsResult"); if (NULL == root) { SDK_LOG_ERR("Miss root node=ListPartsResult, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -188,7 +188,7 @@ bool ListPartsResp::ParseFromXmlString(const std::string& body) { const std::string& init_node_name = init_node->name(); if ("ID" == init_node_name) { m_initiator.m_id = init_node->value(); - } else if ("DisplyName" == init_node_name) { + } else if ("DisplayName" == init_node_name) { m_initiator.m_display_name = init_node->value(); } else { SDK_LOG_WARN("Unknown field in Initiator node, field_name=%s", @@ -201,7 +201,7 @@ bool ListPartsResp::ParseFromXmlString(const std::string& body) { const std::string& owner_node_name = owner_node->name(); if ("ID" == owner_node_name) { m_owner.m_id = owner_node->value(); - } else if ("DisplyName" == owner_node_name) { + } else if ("DisplayName" == owner_node_name) { m_owner.m_display_name = owner_node->value(); } else { SDK_LOG_WARN("Unknown field in Owner node, field_name=%s", @@ -243,7 +243,7 @@ bool ListPartsResp::ParseFromXmlString(const std::string& body) { } } - delete cstr; + delete[] cstr; return true; } @@ -259,14 +259,14 @@ bool PutObjectCopyResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("CopyObjectResult"); if (NULL == root) { SDK_LOG_ERR("Miss root node=CopyObjectResult, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -284,7 +284,7 @@ bool PutObjectCopyResp::ParseFromXmlString(const std::string& body) { node_name.c_str()); } } - delete cstr; + delete[] cstr; return true; } @@ -296,14 +296,14 @@ bool UploadPartCopyDataResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("CopyPartResult"); if (NULL == root) { SDK_LOG_ERR("Miss root node=CopyObjectResult, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -319,7 +319,7 @@ bool UploadPartCopyDataResp::ParseFromXmlString(const std::string& body) { node_name.c_str()); } } - delete cstr; + delete[] cstr; return true; } @@ -349,14 +349,14 @@ bool DeleteObjectsResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("DeleteResult"); if (NULL == root) { SDK_LOG_ERR("Miss root node=DeleteResult, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -407,7 +407,7 @@ bool DeleteObjectsResp::ParseFromXmlString(const std::string& body) { node_name.c_str()); } } - delete cstr; + delete[] cstr; return true; } diff --git a/src/response/service_resp.cpp b/src/response/service_resp.cpp index b00c2a4..a06e567 100644 --- a/src/response/service_resp.cpp +++ b/src/response/service_resp.cpp @@ -28,14 +28,14 @@ bool GetServiceResp::ParseFromXmlString(const std::string& body) { if (!StringUtil::StringToXml(cstr, &doc)) { SDK_LOG_ERR("Parse string to xml doc error, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } rapidxml::xml_node<>* root = doc.first_node("ListAllMyBucketsResult"); if (NULL == root) { SDK_LOG_ERR("Miss root node=ListAllMyBucketsResult, xml_body=%s", body.c_str()); - delete cstr; + delete[] cstr; return false; } @@ -86,7 +86,7 @@ bool GetServiceResp::ParseFromXmlString(const std::string& body) { node_name.c_str(), body.c_str()); } } - delete cstr; + delete[] cstr; return true; } diff --git a/src/trsf/transfer_handler.cpp b/src/trsf/transfer_handler.cpp new file mode 100644 index 0000000..d7e99f6 --- /dev/null +++ b/src/trsf/transfer_handler.cpp @@ -0,0 +1,166 @@ +#include "trsf/transfer_handler.h" + +#include "Poco/Buffer.h" +#include "boost/thread/lock_guard.hpp" + +namespace qcloud_cos { + PartState::PartState() : + m_part_num(0), + m_etag(""), + m_size_inbytes(0), + m_lastpart(false){} + + PartState::PartState(int part_num, std::string& etag, size_t size, bool last_part) : + m_part_num(part_num), + m_etag(etag), + m_size_inbytes(size), + m_lastpart(last_part){} + + + TransferHandler::TransferHandler(const std::string& bucket_name, const std::string& object_name, + uint64_t total_size, const std::string& file_path) : + m_bucket_name(bucket_name), + m_object_name(object_name), + m_local_file_path(file_path), + m_total_size(total_size), + m_current_progress(0), + m_status(TransferStatus::NOT_START), + m_uploadid(""), + m_cancel(false) {} + + + static std::string GetNameForStatus(TransferStatus status) { + switch (status) { + case TransferStatus::NOT_START: + return "NOT_START"; + case TransferStatus::IN_PROGRESS: + return "IN_PROGRESS"; + case TransferStatus::CANCELED: + return "CANCELED"; + case TransferStatus::FAILED: + return "FAILED"; + case TransferStatus::COMPLETED: + return "COMPLETED"; + case TransferStatus::ABORTED: + return "ABORTED"; + } + } + + void TransferHandler::UpdateProgress(uint64_t update_prog) { + boost::lock_guard locker(m_lock_prog); + + m_current_progress += update_prog; + + // Notice the progress there can not backwards, but the each parts has retry counts, + // should limit the progress no bigger than the total size. + // s3 has two invariants:(1) Never lock; (2) Never go backwards. Complete me. + if (m_current_progress > m_total_size) { + m_current_progress = m_total_size; + } + + } + + uint64_t TransferHandler::GetProgress() const { + boost::lock_guard locker(m_lock_prog); + return m_current_progress; + } + + bool TransferHandler::IsFinishStatus(TransferStatus status) const { + switch (status) { + case TransferStatus::ABORTED: + case TransferStatus::COMPLETED: + case TransferStatus::FAILED: + case TransferStatus::CANCELED: + return true; + default: + return false; + } + } + + bool TransferHandler::IsAllowTransition(TransferStatus org, TransferStatus dst) const { + if (org == dst) { + return true; + } + + if (IsFinishStatus(org) && IsFinishStatus(dst)) { + return org == TransferStatus::CANCELED && dst == TransferStatus::ABORTED; + } + + return true; + } + + void TransferHandler::UpdateStatus(TransferStatus status) { + // There can unlock the condition to release the waitUntilFinish + // But need use the boost *-* + boost::unique_lock locker(m_lock_stat); + if (IsAllowTransition(m_status, status)) { + m_status = status; + + if (IsFinishStatus(status)) { + if (status == TransferStatus::COMPLETED) { + // Some other logic + } + locker.unlock(); + m_cond.notify_all(); + + } + } + + } + + TransferStatus TransferHandler::GetStatus() const { + boost::lock_guard locker(m_lock_stat); + return m_status; + } + + void TransferHandler::Cancel() { + boost::lock_guard locker(m_lock_stat); + m_cancel = true; + } + + bool TransferHandler::ShouldContinue() const { + boost::lock_guard locker(m_lock_stat); + return !m_cancel; + } + + void TransferHandler::WaitUntilFinish() { + boost::unique_lock locker(m_lock_stat); + while (!IsFinishStatus(m_status)) { + m_cond.wait(locker); + } + } + + + std::string TransferHandler::GetStatusString() const { + return GetNameForStatus(m_status); + } + + std::streamsize HandleStreamCopier::handleCopyStream(const MultiUploadObjectReq *req, std::istream& istr, std::ostream& ostr, + Poco::SharedPtr& handler, std::size_t bufferSize) { + poco_assert (bufferSize > 0); + + Poco::Buffer buffer(bufferSize); + std::streamsize len = 0; + istr.read(buffer.begin(), bufferSize); + std::streamsize n = istr.gcount(); + while (n > 0) { + // Throw the AssertionViolationException if the conditon is not true + poco_assert (handler->ShouldContinue()); + + len += n; + ostr.write(buffer.begin(), n); + // update progress + handler->UpdateProgress(n); + if(req) { + req->TriggerUploadProgressCallback(handler); + } + if (istr && ostr) { + istr.read(buffer.begin(), bufferSize); + n = istr.gcount(); + } else { + n = 0; + } + } + return len; + } +} diff --git a/src/util/auth_tool.cpp b/src/util/auth_tool.cpp index 008880e..1085bed 100644 --- a/src/util/auth_tool.cpp +++ b/src/util/auth_tool.cpp @@ -2,7 +2,7 @@ #include #include -#include +#include #include #include @@ -13,6 +13,12 @@ #include "util/http_sender.h" #include "cos_sys_config.h" +#if defined(_WIN32) +#define strncasecmp _strnicmp +#define strcasecmp _stricmp +#endif + + namespace qcloud_cos { void AuthTool::FilterAndSetSignHeader(const std::map &headers, diff --git a/src/util/http_sender.cpp b/src/util/http_sender.cpp index f797678..e2086e1 100644 --- a/src/util/http_sender.cpp +++ b/src/util/http_sender.cpp @@ -7,7 +7,11 @@ #include "util/http_sender.h" -#include +#if defined(_WIN32) +#include +#else +#include +#endif #include #include @@ -31,6 +35,40 @@ namespace qcloud_cos { +// Trsf handler +int HttpSender::SendRequest(const MultiUploadObjectReq *req, + const std::string& http_method, + const std::string& url_str, + const std::map& req_params, + const std::map& req_headers, + const std::string& req_body, + Poco::SharedPtr& handler, + uint64_t conn_timeout_in_ms, + uint64_t recv_timeout_in_ms, + std::map* resp_headers, + std::string* resp_body, + std::string* err_msg, + bool is_check_md5) { + std::istringstream is(req_body); + std::ostringstream oss; + int ret = SendRequest(req, + http_method, + url_str, + req_params, + req_headers, + is, + conn_timeout_in_ms, + recv_timeout_in_ms, + resp_headers, + oss, + err_msg, + handler, + is_check_md5); + *resp_body = oss.str(); + return ret; +} + + int HttpSender::SendRequest(const std::string& http_method, const std::string& url_str, const std::map& req_params, @@ -411,12 +449,183 @@ int HttpSender::SendRequest(const std::string& http_method, return res.getStatus(); } -// TODO(sevenyou) 挪走 + +// Real trsf handler process +int HttpSender::SendRequest(const MultiUploadObjectReq *objreq, + const std::string& http_method, + const std::string& url_str, + const std::map& req_params, + const std::map& req_headers, + std::istream& is, + uint64_t conn_timeout_in_ms, + uint64_t recv_timeout_in_ms, + std::map* resp_headers, + std::ostream& resp_stream, + std::string* err_msg, + Poco::SharedPtr& handler, + bool is_check_md5) { + Poco::Net::HTTPResponse res; + try { + Poco::URI url(url_str); + boost::scoped_ptr session; + if (StringUtil::StringStartsWithIgnoreCase(url_str, "https")) { + Poco::Net::Context::Ptr context = new Poco::Net::Context(Poco::Net::Context::CLIENT_USE, + "", "", "", Poco::Net::Context::VERIFY_RELAXED, + 9, true, "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"); + session.reset(new Poco::Net::HTTPSClientSession(url.getHost(), url.getPort(), context)); + } else { + session.reset(new Poco::Net::HTTPClientSession(url.getHost(), url.getPort())); + } + + session->setTimeout(Poco::Timespan(0, conn_timeout_in_ms * 1000)); + + // 1. 拼接path_query字符串 + std::string path = url.getPath(); + if (path.empty()) { + path += "/"; + } + + std::string query_str; + for (std::map::const_iterator c_itr = req_params.begin(); + c_itr != req_params.end(); ++c_itr) { + std::string part; + if (c_itr->second.empty()) { + part = CodecUtil::UrlEncode(c_itr->first) + "&"; + } else { + part = CodecUtil::UrlEncode(c_itr->first) + "=" + CodecUtil::UrlEncode(c_itr->second) + "&"; + } + query_str += part; + } + + if (!query_str.empty()) { + query_str = "?" + query_str.substr(0, query_str.size() - 1); + } + std::string path_and_query_str = CodecUtil::EncodeKey(path) + query_str; + + // 2. 创建http request, 并填充头部 + Poco::Net::HTTPRequest req(http_method, path_and_query_str, Poco::Net::HTTPMessage::HTTP_1_1); + for (std::map::const_iterator c_itr = req_headers.begin(); + c_itr != req_headers.end(); ++c_itr) { + req.add(c_itr->first, (c_itr->second).c_str()); + } + + // 3. 计算长度 + std::streampos pos = is.tellg(); + is.seekg(0, std::ios::end); + req.setContentLength(is.tellg()); + is.seekg(pos); + +#ifdef __COS_DEBUG__ + std::ostringstream debug_os; + req.write(debug_os); + SDK_LOG_DBG("request=[%s]", debug_os.str().c_str()); +#endif + + // 4. 发送请求 + std::ostream& os = session->sendRequest(req); + // According to the copyStream to handle the process + // TODO overwrite the copyStream insider to record + // Poco::StreamCopier::copyStream(is, os); + HandleStreamCopier::handleCopyStream(objreq, is, os, handler); + + // 5. 接收返回 + Poco::Net::StreamSocket& ss = session->socket(); + ss.setReceiveTimeout(Poco::Timespan(0, recv_timeout_in_ms * 1000)); + std::istream& recv_stream = session->receiveResponse(res); + + // 6. 处理返回 + int ret = res.getStatus(); + resp_headers->insert(res.begin(), res.end()); + + std::string etag = ""; + std::map::const_iterator etag_itr + = resp_headers->find("ETag"); + if (etag_itr != resp_headers->end()) { + etag = StringUtil::Trim(etag_itr->second, "\""); + } + + if (is_check_md5 && !StringUtil::IsV4ETag(etag) + && !StringUtil::IsMultipartUploadETag(etag)) { + SDK_LOG_DBG("Check Response Md5"); + Poco::MD5Engine md5; + Poco::DigestOutputStream dos(md5); + + // explicit iostream (streambuf* sb); + std::stringbuf ibuf; + std::iostream io_tmp(&ibuf); + + // The Poco session->receiveResponse return the streambuf which dose not overload the base_iostream seekpos which is the realization of the tellg and seekg. + // It casue the recv_stream can not relocation the begin postion, so can not reuse of the recv_stream. + // FIXME it might has property issue. + Poco::StreamCopier::copyStream(recv_stream, io_tmp); + + std::streampos pos = io_tmp.tellg(); + Poco::StreamCopier::copyStream(io_tmp, dos); + io_tmp.clear(); + io_tmp.seekg(pos); + dos.close(); + + std::string md5_str = Poco::DigestEngine::digestToHex(md5.digest()); + + if (etag != md5_str) { + *err_msg = "Md5 of response body is not equal to the etag in the header." + " Body Md5= " + md5_str + ", etag=" + etag; + SDK_LOG_ERR("Check Md5 fail, %s", err_msg->c_str()); + ret = -1; + } + Poco::StreamCopier::copyStream(io_tmp, resp_stream); + }else { + Poco::StreamCopier::copyStream(recv_stream, resp_stream); + } + +#ifdef __COS_DEBUG__ + SDK_LOG_DBG("response header :\n"); + for (std::map::const_iterator itr = resp_headers->begin(); + itr != resp_headers->end(); ++itr) { + SDK_LOG_DBG("key=[%s], value=[%s]\n", itr->first.c_str(), itr->second.c_str()); + } +#endif + SDK_LOG_INFO("Send request over, status=%d, reason=%s", + res.getStatus(), res.getReason().c_str()); + return ret; + } catch (Poco::Net::NetException& ex){ + SDK_LOG_ERR("Net Exception:%s", ex.displayText().c_str()); + *err_msg = "Net Exception:" + ex.displayText(); + return -1; + } catch (Poco::TimeoutException& ex) { + SDK_LOG_ERR("TimeoutException:%s", ex.displayText().c_str()); + *err_msg = "TimeoutException:" + ex.displayText(); + return -1; + } catch (Poco::AssertionViolationException& ex) { + // handle the cancel way or other violation exception + *err_msg = "AssertionViolationException:%s" + ex.displayText(); + return -1; + } catch (const std::exception &ex) { + SDK_LOG_ERR("Exception:%s, errno=%d", std::string(ex.what()).c_str(), errno); + *err_msg = "Exception:" + std::string(ex.what()); + return -1; + } + + return res.getStatus(); +} + +// Must notice for now the sign auth time is second. +// the time function in windows can only support to second, +// also can use the GetSystemTimeAsFileTime which the resolution is only 15625 microseconds +// (if you make successive calls to gettimeofday() until it changes value). +// It's not much better than calling GetLocalTime() which is accurate to between 15000 and 31000 microseconds. +// This differs significantly from the unix implementations which are accurate close to the microsecond. uint64_t HttpSender::GetTimeStampInUs() { // 构造时间 - struct timeval tv; - gettimeofday(&tv, NULL); - return tv.tv_sec * 1000000 + tv.tv_usec; +#if defined(_WIN32) + time_t ltime; + time(<ime); + return (uint64_t)(ltime * 1000000); +#else + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec * 1000000 + tv.tv_usec; +#endif } } // namespace qcloud_cos diff --git a/src/util/string_util.cpp b/src/util/string_util.cpp index f61a02f..986af5e 100644 --- a/src/util/string_util.cpp +++ b/src/util/string_util.cpp @@ -6,6 +6,12 @@ #include #include +#if defined(WIN32) +#define strncasecmp _strnicmp +#define strcasecmp _stricmp +#endif + + namespace qcloud_cos { std::string& StringUtil::Trim(std::string &s) { diff --git a/unittest/object_op_test.cpp b/unittest/object_op_test.cpp index 361e684..e0c4264 100644 --- a/unittest/object_op_test.cpp +++ b/unittest/object_op_test.cpp @@ -9,6 +9,7 @@ #include "common.h" #include "cos_api.h" +#include "trsf/transfer_handler.h" namespace qcloud_cos { @@ -464,6 +465,7 @@ TEST_F(ObjectOpTest, MultiUploadObjectTest_OneStep) { MultiUploadObjectReq req(m_bucket_name, object_name, filename); MultiUploadObjectResp resp; + CosResult result = m_client->MultiUploadObject(req, &resp); EXPECT_TRUE(result.IsSucc());