diff --git a/Makefile b/Makefile index 3b2516117023..7eb437c47533 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,9 @@ endif juicefs: Makefile cmd/*.go pkg/*/*.go go build -ldflags="$(LDFLAGS)" -o juicefs ./cmd +juicefs.ceph: Makefile cmd/*.go pkg/*/*.go + go build -tags ceph -ldflags="$(LDFLAGS)" -o juicefs.ceph ./cmd + .PHONY: snapshot release test snapshot: docker run --rm --privileged \ diff --git a/go.mod b/go.mod index d9e5005b1d3a..dda8f671be5d 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.13 require ( github.com/DataDog/zstd v1.4.5 github.com/VividCortex/godaemon v0.0.0-20201215173923-eda977734e72 + github.com/ceph/go-ceph v0.4.0 github.com/go-redis/redis/v8 v8.4.0 github.com/google/gops v0.3.13 github.com/google/uuid v1.1.1 diff --git a/go.sum b/go.sum index b02572403936..ee2c055b773d 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/aws/aws-sdk-go v1.12.10/go.mod h1:ZRmQr0FajVIyZ4ZzBYKG5P3ZqPz9IHG41Zo github.com/baidubce/bce-sdk-go v0.0.0-20180401121131-aa0c7bd66b01 h1:pmQ6WjFOHtNL0IHsLB0r3fOyRQ6KK/CfZ9dDI7ugZnU= github.com/baidubce/bce-sdk-go v0.0.0-20180401121131-aa0c7bd66b01/go.mod h1:T3yEA2H7hXAlvniSEJRsPlDYlh8OEZZzH0zlIP/1JIY= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= +github.com/ceph/go-ceph v0.4.0 h1:KJsT6j1IbsEtui3ZtDcZO//uZ+IVBNT6KO7u9PuMovE= +github.com/ceph/go-ceph v0.4.0/go.mod h1:wd+keAOqrcsN//20VQnHBGtnBnY0KHl0PA024Ng8HfQ= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -45,6 +47,7 @@ github.com/go-ini/ini v1.28.2/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-redis/redis/v8 v8.4.0 h1:J5NCReIgh3QgUJu398hUncxDExN4gMOHI11NVbVicGQ= github.com/go-redis/redis/v8 v8.4.0/go.mod h1:A1tbYoHSa1fXwN+//ljcCYYJeLmVrwL9hbQN45Jdy0M= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -221,6 +224,7 @@ golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200501145240-bc7a7d42d5c3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= diff --git a/pkg/object/ceph.go b/pkg/object/ceph.go new file mode 100644 index 000000000000..b75d2ab183ad --- /dev/null +++ b/pkg/object/ceph.go @@ -0,0 +1,188 @@ +// +build ceph + +/* + * JuiceFS, Copyright (C) 2020 Juicedata, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package object + +import ( + "bytes" + "fmt" + "io" + "net/url" + "reflect" + "sync" + + "github.com/ceph/go-ceph/rados" +) + +type ceph struct { + name string + conn *rados.Conn + free chan *rados.IOContext +} + +func (c *ceph) String() string { + return fmt.Sprintf("ceph://%s", c.name) +} + +func (c *ceph) Create() error { + names, err := c.conn.ListPools() + if err != nil { + return err + } + for _, name := range names { + if name == c.name { + return nil + } + } + return c.conn.MakePool(c.name) +} + +func (c *ceph) newContext() (*rados.IOContext, error) { + select { + case ctx := <-c.free: + return ctx, nil + default: + return c.conn.OpenIOContext(c.name) + } +} + +func (c *ceph) release(ctx *rados.IOContext) { + select { + case c.free <- ctx: + default: + ctx.Destroy() + } +} + +func (c *ceph) do(f func(ctx *rados.IOContext) error) (err error) { + ctx, err := c.newContext() + if err != nil { + return err + } + err = f(ctx) + if err != nil { + ctx.Destroy() + } else { + c.release(ctx) + } + return +} + +type cephReader struct { + c *ceph + ctx *rados.IOContext + key string + off int64 + limit int64 +} + +func (r *cephReader) Read(buf []byte) (n int, err error) { + if r.limit > 0 && int64(len(buf)) > r.limit { + buf = buf[:r.limit] + } + n, err = r.ctx.Read(r.key, buf, uint64(r.off)) + r.off += int64(n) + if r.limit > 0 { + r.limit -= int64(n) + } + if err == nil && n < len(buf) { + err = io.EOF + } + return +} + +func (r *cephReader) Close() error { + if r.ctx != nil { + r.c.release(r.ctx) + r.ctx = nil + } + return nil +} + +func (c *ceph) Get(key string, off, limit int64) (io.ReadCloser, error) { + ctx, err := c.newContext() + if err != nil { + return nil, err + } + return &cephReader{c, ctx, key, off, limit}, nil +} + +var cephPool = sync.Pool{ + New: func() interface{} { + return make([]byte, 1<<20) + }, +} + +func (c *ceph) Put(key string, in io.Reader) error { + return c.do(func(ctx *rados.IOContext) error { + if b, ok := in.(*bytes.Reader); ok { + v := reflect.ValueOf(b) + data := v.Elem().Field(0).Bytes() + return ctx.WriteFull(key, data) + } + buf := cephPool.Get().([]byte) + defer cephPool.Put(buf) + var off uint64 + for { + n, err := in.Read(buf) + if n > 0 { + if err = ctx.Write(key, buf[:n], off); err != nil { + return err + } + off += uint64(n) + } else { + if err == io.EOF { + return nil + } + return err + } + } + }) +} + +func (c *ceph) Delete(key string) error { + return c.do(func(ctx *rados.IOContext) error { + return ctx.Delete(key) + }) +} + +func newCeph(endpoint, cluster, user string) (ObjectStorage, error) { + uri, err := url.ParseRequestURI(endpoint) + if err != nil { + return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err) + } + name := uri.Host + conn, err := rados.NewConnWithClusterAndUser(cluster, user) + if err != nil { + return nil, fmt.Errorf("Can't create connection to cluster %s for user %s: %s", cluster, user, err) + } + if err := conn.ReadDefaultConfigFile(); err != nil { + logger.Fatalf("Can't read default config file: %s", err) + } + if err := conn.Connect(); err != nil { + return nil, fmt.Errorf("Can't connect to cluster %s: %s", cluster, err) + } + return &ceph{ + name: name, + conn: conn, + free: make(chan *rados.IOContext, 50), + }, nil +} + +func init() { + register("ceph", newCeph) +}