-
Notifications
You must be signed in to change notification settings - Fork 3
/
driver.go
110 lines (96 loc) · 3.06 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package daprdriver
import (
"context"
"fmt"
"strings"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/proto"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dtm-labs/dtmdriver"
"github.com/dtm-labs/logger"
"github.com/go-resty/resty/v2"
)
const (
DriverName = "dtm-driver-dapr"
format = "<schema>://<host>/<dapr-app-id>/<method-name>"
cDaprEnv = "DAPR_ENV"
cAppid = "dapr-app-id"
SchemaHTTP = "daprhttp"
SchemaProxiedHTTP = "daprphttp"
SchemaGrpc = "daprgrpc"
SchemaProxiedGrpc = "daprpgrpc"
)
type (
darpDriver struct{}
)
func (z *darpDriver) GetName() string {
return DriverName
}
func (z *darpDriver) RegisterAddrResolver() {
dtmdriver.Middlewares.Grpc = append(dtmdriver.Middlewares.Grpc, func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
target := cc.Target()
addr, err := ParseDaprUrl(target)
if err != nil {
return err
}
if addr.Schema == SchemaProxiedGrpc {
ctx = metadata.AppendToOutgoingContext(ctx, cAppid, addr.Appid)
} else if addr.Schema == SchemaGrpc {
method2 := strings.TrimPrefix(method, "/")
updateReq := func(r *pb.InvokeServiceRequest) {
r.Id = addr.Appid
r.Message.Method = method2
}
req2, ok := req.(*pb.InvokeServiceRequest)
if ok { // if dtm SDK call branch, req is type of *pb.InvokeServiceRequest
updateReq(req2)
} else { // if dtm server call branch directly, req is type of []byte
var req3 pb.InvokeServiceRequest
err := proto.Unmarshal(req.([]byte), &req3)
if err == nil {
updateReq(&req3)
req, err = proto.Marshal(&req3)
}
if err != nil {
return err
}
}
method = "/dapr.proto.runtime.v1.Dapr/InvokeService"
}
logger.Debugf("invoking target: %s, method: %s", target, method)
return invoker(ctx, method, req, reply, cc, opts...)
})
resolver.Register(&proxyBuilder{})
resolver.Register(&daprBuilder{})
dtmdriver.Middlewares.HTTP = append(dtmdriver.Middlewares.HTTP, func(c *resty.Client, r *resty.Request) error {
addr, err := ParseDaprUrl(r.URL)
if err != nil {
return err
}
old := r.URL
if addr.Schema == SchemaProxiedHTTP {
r.SetHeader(cAppid, addr.Appid)
r.URL = fmt.Sprintf("http://%s/%s", addr.Host, addr.MethodName)
} else if addr.Schema == SchemaHTTP {
r.URL = fmt.Sprintf("http://%s/v1.0/invoke/%s/method/%s", addr.Host, addr.Appid, addr.MethodName)
}
logger.Debugf("url %s resolved to %s", old, r.URL)
return nil
})
}
func (z *darpDriver) RegisterService(target string, endpoint string) error {
return nil
}
func (z *darpDriver) ParseServerMethod(uri string) (server string, method string, err error) {
addr, err := ParseDaprUrl(uri)
if addr.Schema == "" {
fs := strings.Split(uri, "/")
return fs[0], "/" + strings.Join(fs[1:], "/"), nil
}
return fmt.Sprintf("%s://%s/%s", addr.Schema, addr.Host, addr.Appid), "/" + addr.MethodName, err
}
func init() {
dtmdriver.Register(&darpDriver{})
}