Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] support turbo RPC #236

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ GOBUILD := $(GO) build $(BUILD_FLAG)

default:
$(GOBUILD) -ldflags "-X main.gitHash=`git rev-parse HEAD`" -o node/node node/main.go

linux:
GOOS=linux $(GOBUILD) -ldflags "-X main.gitHash=`git rev-parse HEAD`" -o node/node-linux node/main.go
12 changes: 11 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Config struct {
Raft bool `toml:"raft"` // Enable raft.
RaftWorkers int `toml:"raft-workers"` // Number of raft workers.
Engine Engine `toml:"engine"` // Engine options.
Turbo Turbo `toml:"turbo"`
}

type Engine struct {
Expand All @@ -26,11 +27,15 @@ type Engine struct {
NumCompactors int `toml:"num-compactors"`
}

type Turbo struct {
Concurrency int `toml:"concurrency"`
}

const MB = 1024 * 1024

var DefaultConf = Config{
PDAddr: "127.0.0.1:2379",
StoreAddr: "127.0.0.1:9191",
StoreAddr: "127.0.0.1:9191,127.0.0.1:9181",
HttpAddr: "127.0.0.1:9291",
RegionSize: 64 * MB,
LogLevel: "info",
Expand All @@ -48,4 +53,9 @@ var DefaultConf = Config{
SyncWrite: true,
NumCompactors: 1,
},
Turbo: Turbo{
Concurrency: 2,
},
}

var Global = DefaultConf
6 changes: 5 additions & 1 deletion node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ngaut/unistore/tikv"
"github.com/ngaut/unistore/tikv/mvcc"
"github.com/ngaut/unistore/tikv/raftstore"
"github.com/ngaut/unistore/util"
"github.com/pingcap/kvproto/pkg/tikvpb"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -80,14 +81,16 @@ func main() {
}

tikvServer := tikv.NewServer(regionManager, store, innerServer)
turboServer := tikv.NewTurboServer(regionManager, store, innerServer, conf.StoreAddr)
go turboServer.Run()

grpcServer := grpc.NewServer(
grpc.InitialWindowSize(grpcInitialWindowSize),
grpc.InitialConnWindowSize(grpcInitialConnWindowSize),
grpc.MaxRecvMsgSize(10*1024*1024),
)
tikvpb.RegisterTikvServer(grpcServer, tikvServer)
l, err := net.Listen("tcp", conf.StoreAddr)
l, err := net.Listen("tcp", util.GRPCAddr(conf.StoreAddr))
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -123,6 +126,7 @@ func loadConfig() *config.Config {
panic(err)
}
}
config.Global = conf
return &conf
}

Expand Down
4 changes: 3 additions & 1 deletion tikv/raftstore/raft_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"

"github.com/ngaut/unistore/util"

"github.com/ngaut/log"
"github.com/pingcap/kvproto/pkg/raft_serverpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
Expand All @@ -19,7 +21,7 @@ type raftConn struct {
}

func newRaftConn(addr string, cfg *Config) (*raftConn, error) {
cc, err := grpc.Dial(addr, grpc.WithInsecure(),
cc, err := grpc.Dial(util.GRPCAddr(addr), grpc.WithInsecure(),
grpc.WithInitialWindowSize(int32(cfg.GrpcInitialWindowSize)),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: cfg.GrpcKeepAliveTime,
Expand Down
4 changes: 2 additions & 2 deletions tikv/raftstore/snapRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync/atomic"
"time"

"github.com/ngaut/unistore/util"
"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/raft_serverpb"
Expand Down Expand Up @@ -72,8 +73,7 @@ func (r *snapRunner) sendSnap(addr string, msg *raft_serverpb.RaftMessage) error
if !snap.Exists() {
return errors.Errorf("missing snap file: %v", snap.Path())
}

cc, err := grpc.Dial(addr, grpc.WithInsecure(),
cc, err := grpc.Dial(util.GRPCAddr(addr), grpc.WithInsecure(),
grpc.WithInitialWindowSize(int32(r.config.GrpcInitialWindowSize)),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: r.config.GrpcKeepAliveTime,
Expand Down
30 changes: 24 additions & 6 deletions tikv/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ type RegionOptions struct {

type RegionManager interface {
GetRegionFromCtx(ctx *kvrpcpb.Context) (*regionCtx, *errorpb.Error)
GetRegionByReqHeader(reqHeader *RequestHeader) (*regionCtx, *errorpb.Error)
Close() error
}

Expand All @@ -205,15 +206,32 @@ type regionManager struct {
}

func (rm *regionManager) GetRegionFromCtx(ctx *kvrpcpb.Context) (*regionCtx, *errorpb.Error) {
regionID := ctx.RegionId
storeID := rm.storeMeta.Id
ctxPeer := ctx.GetPeer()
if ctxPeer != nil && ctxPeer.GetStoreId() != rm.storeMeta.Id {
if ctxPeer != nil {
storeID = ctxPeer.StoreId
}
epoch := ctx.GetRegionEpoch()
version, confVersion := epoch.Version, epoch.ConfVer
return rm.getRegion(regionID, storeID, version, confVersion)
}

func (rm *regionManager) GetRegionByReqHeader(reqHeader *RequestHeader) (*regionCtx, *errorpb.Error) {
regionID, storeID := reqHeader.RegionID, uint64(reqHeader.StoreID)
version, confVersion := uint64(reqHeader.RegionVer), uint64(reqHeader.RegionConfVer)
return rm.getRegion(regionID, storeID, version, confVersion)
}

func (rm *regionManager) getRegion(regionID, storeID, version, confVersion uint64) (*regionCtx, *errorpb.Error) {
if storeID != rm.storeMeta.Id {
return nil, &errorpb.Error{
Message: "store not match",
StoreNotMatch: &errorpb.StoreNotMatch{},
}
}
rm.mu.RLock()
ri := rm.regions[ctx.RegionId]
ri := rm.regions[regionID]
if ri != nil {
ri.refCount.Add(1)
}
Expand All @@ -222,12 +240,12 @@ func (rm *regionManager) GetRegionFromCtx(ctx *kvrpcpb.Context) (*regionCtx, *er
return nil, &errorpb.Error{
Message: "region not found",
RegionNotFound: &errorpb.RegionNotFound{
RegionId: ctx.GetRegionId(),
RegionId: regionID,
},
}
}
// Region epoch does not match.
if rm.isEpochStale(ri.getRegionEpoch(), ctx.GetRegionEpoch()) {
if rm.isEpochStale(ri.getRegionEpoch().Version, ri.getRegionEpoch().ConfVer, version, confVersion) {
ri.refCount.Done()
return nil, &errorpb.Error{
Message: "stale epoch",
Expand All @@ -246,8 +264,8 @@ func (rm *regionManager) GetRegionFromCtx(ctx *kvrpcpb.Context) (*regionCtx, *er
return ri, nil
}

func (rm *regionManager) isEpochStale(lhs, rhs *metapb.RegionEpoch) bool {
return lhs.GetConfVer() != rhs.GetConfVer() || lhs.GetVersion() != rhs.GetVersion()
func (rm *regionManager) isEpochStale(lhsVer, lhsConfVer, rhsVer, rhsConfVer uint64) bool {
return lhsVer != rhsVer || lhsConfVer != rhsConfVer
}

type RaftRegionManager struct {
Expand Down
2 changes: 1 addition & 1 deletion tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func (svr *Server) BatchRaft(stream tikvpb.Tikv_BatchRaftServer) error {
// Region commands.
func (svr *Server) SplitRegion(ctx context.Context, req *kvrpcpb.SplitRegionRequest) (*kvrpcpb.SplitRegionResponse, error) {
// TODO
return &kvrpcpb.SplitRegionResponse{}, nil
return &kvrpcpb.SplitRegionResponse{RegionError: &errorpb.Error{Message: "not supported"}}, nil
}

func (svr *Server) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) {
Expand Down
Loading