Skip to content

Commit

Permalink
update from dtm to version v1.16.4
Browse files Browse the repository at this point in the history
  • Loading branch information
yedf2 committed Nov 25, 2022
1 parent a0fc6f7 commit 7f3e3b2
Show file tree
Hide file tree
Showing 17 changed files with 371 additions and 116 deletions.
2 changes: 2 additions & 0 deletions dtmcli/dtmimp/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (
MsgDoBarrier1 = "01"
// MsgDoOp const for DoAndSubmit barrier op
MsgDoOp = "msg"
//MsgTopicPrefix const for Add topic msg
MsgTopicPrefix = "topic://"

// XaBarrier1 const for xa barrier id
XaBarrier1 = "01"
Expand Down
31 changes: 18 additions & 13 deletions dtmcli/dtmimp/trans_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ func (g *BranchIDGen) CurrentSubBranchID() string {

// TransOptions transaction options
type TransOptions struct {
WaitResult bool `json:"wait_result,omitempty" gorm:"-"`
TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc, unit: second
RequestTimeout int64 `json:"request_timeout,omitempty" gorm:"-"` // for global trans resets request timeout, unit: second
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc, unit: second
PassthroughHeaders []string `json:"passthrough_headers,omitempty" gorm:"-"` // for inherit the specified gin context headers
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` // custom branch headers, dtm server => service api
Concurrent bool `json:"concurrent" gorm:"-"` // for trans type: saga msg
WaitResult bool `json:"wait_result,omitempty" gorm:"-"`
TimeoutToFail int64 `json:"timeout_to_fail,omitempty" gorm:"-"` // for trans type: xa, tcc, unit: second
RequestTimeout int64 `json:"request_timeout,omitempty" gorm:"-"` // for global trans resets request timeout, unit: second
RetryInterval int64 `json:"retry_interval,omitempty" gorm:"-"` // for trans type: msg saga xa tcc, unit: second
BranchHeaders map[string]string `json:"branch_headers,omitempty" gorm:"-"` // custom branch headers, dtm server => service api
Concurrent bool `json:"concurrent" gorm:"-"` // for trans type: saga msg
RetryLimit int64 `json:"retry_limit,omitempty" gorm:"-"` // for trans type: saga
RetryCount int64 `json:"retry_count,omitempty" gorm:"-"` // for trans type: saga
}

// TransBase base for all trans
Expand All @@ -75,12 +76,11 @@ type TransBase struct {
// NewTransBase new a TransBase
func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase {
return &TransBase{
Gid: gid,
TransType: transType,
BranchIDGen: BranchIDGen{BranchID: branchID},
Dtm: dtm,
TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders},
Context: context.Background(),
Gid: gid,
TransType: transType,
BranchIDGen: BranchIDGen{BranchID: branchID},
Dtm: dtm,
Context: context.Background(),
}
}

Expand All @@ -89,6 +89,11 @@ func (t *TransBase) WithGlobalTransRequestTimeout(timeout int64) {
t.RequestTimeout = timeout
}

// WithRetryLimit defines global trans retry limit
func (t *TransBase) WithRetryLimit(retryLimit int64) {
t.RetryLimit = retryLimit
}

// TransBaseFromQuery construct transaction info from request
func TransBaseFromQuery(qs url.Values) *TransBase {
return NewTransBase(EscapeGet(qs, "gid"), EscapeGet(qs, "trans_type"), EscapeGet(qs, "dtm"), EscapeGet(qs, "branch_id"))
Expand Down
1 change: 1 addition & 0 deletions dtmcli/dtmimp/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ type DBConf struct {
User string `yaml:"User"`
Password string `yaml:"Password"`
Db string `yaml:"Db"`
Schema string `yaml:"Schema"`
}
4 changes: 2 additions & 2 deletions dtmcli/dtmimp/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ func GetDsn(conf DBConf) string {
dsn := map[string]string{
"mysql": fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=true&loc=Local&interpolateParams=true",
conf.User, conf.Password, host, conf.Port, conf.Db),
"postgres": fmt.Sprintf("host=%s user=%s password=%s dbname='%s' port=%d sslmode=disable",
host, conf.User, conf.Password, conf.Db, conf.Port),
"postgres": fmt.Sprintf("host=%s user=%s password=%s dbname='%s' search_path=%s port=%d sslmode=disable",
host, conf.User, conf.Password, conf.Db, conf.Schema, conf.Port),
}[driver]
PanicIf(dsn == "", fmt.Errorf("unknow driver: %s", driver))
return dsn
Expand Down
3 changes: 0 additions & 3 deletions dtmcli/dtmimp/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess}
// MapFailure HTTP result of FAILURE
var MapFailure = map[string]interface{}{"dtm_result": ResultFailure}

// PassthroughHeaders will be passed to every sub-trans call
var PassthroughHeaders = []string{}

// BarrierTableName the table name of barrier table
var BarrierTableName = "dtm_barrier.barrier"

Expand Down
6 changes: 6 additions & 0 deletions dtmcli/trans_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package dtmcli
import (
"database/sql"
"errors"
"fmt"

"github.com/dtm-labs/client/dtmcli/dtmimp"
)
Expand All @@ -31,6 +32,11 @@ func (s *Msg) Add(action string, postData interface{}) *Msg {
return s
}

// AddTopic add a new topic step
func (s *Msg) AddTopic(topic string, postData interface{}) *Msg {
return s.Add(fmt.Sprintf("%s%s", dtmimp.MsgTopicPrefix, topic), postData)
}

// SetDelay delay call branch, unit second
func (s *Msg) SetDelay(delay uint64) *Msg {
s.delay = delay
Expand Down
8 changes: 0 additions & 8 deletions dtmcli/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,3 @@ func GetRestyClient() *resty.Client {
func GetRestyClient2(timeout time.Duration) *resty.Client {
return dtmimp.GetRestyClient2(timeout)
}

// SetPassthroughHeaders experimental.
// apply to http header and grpc metadata
// dtm server will save these headers in trans creating request.
// and then passthrough them to sub-trans
func SetPassthroughHeaders(headers []string) {
dtmimp.PassthroughHeaders = headers
}
4 changes: 2 additions & 2 deletions dtmgrpc/dtmgimp/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c
}

// InvokeBranch invoke a url for trans
func InvokeBranch(t *dtmimp.TransBase, isRaw bool, msg proto.Message, url string, reply interface{}, branchID string, op string) error {
func InvokeBranch(t *dtmimp.TransBase, isRaw bool, msg proto.Message, url string, reply interface{}, branchID string, op string, opts ...grpc.CallOption) error {
server, method, err := dtmdriver.GetDriver().ParseServerMethod(url)
if err != nil {
return err
Expand All @@ -65,5 +65,5 @@ func InvokeBranch(t *dtmimp.TransBase, isRaw bool, msg proto.Message, url string
if t.TransType == "xa" { // xa branch need additional phase2_url
ctx = metadata.AppendToOutgoingContext(ctx, Map2Kvs(map[string]string{dtmpre + "phase2_url": url})...)
}
return MustGetGrpcConn(server, isRaw).Invoke(ctx, method, msg, reply)
return MustGetGrpcConn(server, isRaw).Invoke(ctx, method, msg, reply, opts...)
}
24 changes: 12 additions & 12 deletions dtmgrpc/dtmgimp/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func GetDtmRequest(s *dtmimp.TransBase) *dtmgpb.DtmRequest {
Gid: s.Gid,
TransType: s.TransType,
TransOptions: &dtmgpb.DtmTransOptions{
WaitResult: s.WaitResult,
TimeoutToFail: s.TimeoutToFail,
RetryInterval: s.RetryInterval,
PassthroughHeaders: s.PassthroughHeaders,
BranchHeaders: s.BranchHeaders,
RequestTimeout: s.RequestTimeout,
WaitResult: s.WaitResult,
TimeoutToFail: s.TimeoutToFail,
RetryInterval: s.RetryInterval,
BranchHeaders: s.BranchHeaders,
RequestTimeout: s.RequestTimeout,
RetryLimit: s.RetryLimit,
},
QueryPrepared: s.QueryPrepared,
CustomedData: s.CustomData,
Expand All @@ -61,18 +61,18 @@ const dtmpre string = "dtm-"

// TransInfo2Ctx add trans info to grpc context
func TransInfo2Ctx(ctx context.Context, gid, transType, branchID, op, dtm string) context.Context {
md := metadata.Pairs(
nctx := ctx
if ctx == nil {
nctx = context.Background()
}
return metadata.AppendToOutgoingContext(
nctx,
dtmpre+"gid", gid,
dtmpre+"trans_type", transType,
dtmpre+"branch_id", branchID,
dtmpre+"op", op,
dtmpre+"dtm", dtm,
)
nctx := ctx
if ctx == nil {
nctx = context.Background()
}
return metadata.NewOutgoingContext(nctx, md)
}

// Map2Kvs map to metadata kv
Expand Down
Loading

0 comments on commit 7f3e3b2

Please sign in to comment.