Skip to content

Commit

Permalink
Use librados to access Ceph (#27)
Browse files Browse the repository at this point in the history
* Use librados to access Ceph

* build juicefs with librados

* add ceph as dep
  • Loading branch information
davies authored Jan 15, 2021
1 parent 3490de4 commit 1d1582c
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ endif
juicefs: Makefile cmd/*.go pkg/*/*.go
go build -ldflags="$(LDFLAGS)" -o juicefs ./cmd

juicefs.ceph: Makefile cmd/*.go pkg/*/*.go
go build -tags ceph -ldflags="$(LDFLAGS)" -o juicefs.ceph ./cmd

.PHONY: snapshot release test
snapshot:
docker run --rm --privileged \
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.13
require (
github.com/DataDog/zstd v1.4.5
github.com/VividCortex/godaemon v0.0.0-20201215173923-eda977734e72
github.com/ceph/go-ceph v0.4.0
github.com/go-redis/redis/v8 v8.4.0
github.com/google/gops v0.3.13
github.com/google/uuid v1.1.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ github.com/aws/aws-sdk-go v1.12.10/go.mod h1:ZRmQr0FajVIyZ4ZzBYKG5P3ZqPz9IHG41Zo
github.com/baidubce/bce-sdk-go v0.0.0-20180401121131-aa0c7bd66b01 h1:pmQ6WjFOHtNL0IHsLB0r3fOyRQ6KK/CfZ9dDI7ugZnU=
github.com/baidubce/bce-sdk-go v0.0.0-20180401121131-aa0c7bd66b01/go.mod h1:T3yEA2H7hXAlvniSEJRsPlDYlh8OEZZzH0zlIP/1JIY=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/ceph/go-ceph v0.4.0 h1:KJsT6j1IbsEtui3ZtDcZO//uZ+IVBNT6KO7u9PuMovE=
github.com/ceph/go-ceph v0.4.0/go.mod h1:wd+keAOqrcsN//20VQnHBGtnBnY0KHl0PA024Ng8HfQ=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
Expand All @@ -45,6 +47,7 @@ github.com/go-ini/ini v1.28.2/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-redis/redis/v8 v8.4.0 h1:J5NCReIgh3QgUJu398hUncxDExN4gMOHI11NVbVicGQ=
github.com/go-redis/redis/v8 v8.4.0/go.mod h1:A1tbYoHSa1fXwN+//ljcCYYJeLmVrwL9hbQN45Jdy0M=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -221,6 +224,7 @@ golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200501145240-bc7a7d42d5c3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
Expand Down
188 changes: 188 additions & 0 deletions pkg/object/ceph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// +build ceph

/*
* JuiceFS, Copyright (C) 2020 Juicedata, Inc.
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package object

import (
"bytes"
"fmt"
"io"
"net/url"
"reflect"
"sync"

"github.com/ceph/go-ceph/rados"
)

type ceph struct {
name string
conn *rados.Conn
free chan *rados.IOContext
}

func (c *ceph) String() string {
return fmt.Sprintf("ceph://%s", c.name)
}

func (c *ceph) Create() error {
names, err := c.conn.ListPools()
if err != nil {
return err
}
for _, name := range names {
if name == c.name {
return nil
}
}
return c.conn.MakePool(c.name)
}

func (c *ceph) newContext() (*rados.IOContext, error) {
select {
case ctx := <-c.free:
return ctx, nil
default:
return c.conn.OpenIOContext(c.name)
}
}

func (c *ceph) release(ctx *rados.IOContext) {
select {
case c.free <- ctx:
default:
ctx.Destroy()
}
}

func (c *ceph) do(f func(ctx *rados.IOContext) error) (err error) {
ctx, err := c.newContext()
if err != nil {
return err
}
err = f(ctx)
if err != nil {
ctx.Destroy()
} else {
c.release(ctx)
}
return
}

type cephReader struct {
c *ceph
ctx *rados.IOContext
key string
off int64
limit int64
}

func (r *cephReader) Read(buf []byte) (n int, err error) {
if r.limit > 0 && int64(len(buf)) > r.limit {
buf = buf[:r.limit]
}
n, err = r.ctx.Read(r.key, buf, uint64(r.off))
r.off += int64(n)
if r.limit > 0 {
r.limit -= int64(n)
}
if err == nil && n < len(buf) {
err = io.EOF
}
return
}

func (r *cephReader) Close() error {
if r.ctx != nil {
r.c.release(r.ctx)
r.ctx = nil
}
return nil
}

func (c *ceph) Get(key string, off, limit int64) (io.ReadCloser, error) {
ctx, err := c.newContext()
if err != nil {
return nil, err
}
return &cephReader{c, ctx, key, off, limit}, nil
}

var cephPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1<<20)
},
}

func (c *ceph) Put(key string, in io.Reader) error {
return c.do(func(ctx *rados.IOContext) error {
if b, ok := in.(*bytes.Reader); ok {
v := reflect.ValueOf(b)
data := v.Elem().Field(0).Bytes()
return ctx.WriteFull(key, data)
}
buf := cephPool.Get().([]byte)
defer cephPool.Put(buf)
var off uint64
for {
n, err := in.Read(buf)
if n > 0 {
if err = ctx.Write(key, buf[:n], off); err != nil {
return err
}
off += uint64(n)
} else {
if err == io.EOF {
return nil
}
return err
}
}
})
}

func (c *ceph) Delete(key string) error {
return c.do(func(ctx *rados.IOContext) error {
return ctx.Delete(key)
})
}

func newCeph(endpoint, cluster, user string) (ObjectStorage, error) {
uri, err := url.ParseRequestURI(endpoint)
if err != nil {
return nil, fmt.Errorf("Invalid endpoint %s: %s", endpoint, err)
}
name := uri.Host
conn, err := rados.NewConnWithClusterAndUser(cluster, user)
if err != nil {
return nil, fmt.Errorf("Can't create connection to cluster %s for user %s: %s", cluster, user, err)
}
if err := conn.ReadDefaultConfigFile(); err != nil {
logger.Fatalf("Can't read default config file: %s", err)
}
if err := conn.Connect(); err != nil {
return nil, fmt.Errorf("Can't connect to cluster %s: %s", cluster, err)
}
return &ceph{
name: name,
conn: conn,
free: make(chan *rados.IOContext, 50),
}, nil
}

func init() {
register("ceph", newCeph)
}

0 comments on commit 1d1582c

Please sign in to comment.