Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added code for Internal-tls #36865

Merged
merged 6 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/tools/config/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func WriteYaml(w io.Writer) {
name: "tls",
header: "\n# Configure the proxy tls enable.",
},
{
name: "internaltls",
header: "\n# Configure the node-tls enable.",
},
{
name: "common",
},
Expand Down
7 changes: 7 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,12 @@ tls:
serverKeyPath: configs/cert/server.key
caPemPath: configs/cert/ca.pem

# Configure the node-tls enable.
internaltls:
serverPemPath: configs/cert/server.pem
serverKeyPath: configs/cert/server.key
caPemPath: configs/cert/ca.pem

common:
defaultPartitionName: _default # Name of the default partition when a collection is created
defaultIndexName: _default_idx # Name of the index when it is created with name unspecified
Expand Down Expand Up @@ -839,6 +845,7 @@ common:
privileges: Query,Search,IndexDetail,GetFlushState,GetLoadState,GetLoadingProgress,HasPartition,ShowPartitions,DescribeCollection,DescribeAlias,GetStatistics,ListAliases,Load,Release,Insert,Delete,Upsert,Import,Flush,Compaction,LoadBalance,RenameCollection,CreateIndex,DropIndex,CreatePartition,DropPartition # Collection level readwrite privileges
admin:
privileges: Query,Search,IndexDetail,GetFlushState,GetLoadState,GetLoadingProgress,HasPartition,ShowPartitions,DescribeCollection,DescribeAlias,GetStatistics,ListAliases,Load,Release,Insert,Delete,Upsert,Import,Flush,Compaction,LoadBalance,RenameCollection,CreateIndex,DropIndex,CreatePartition,DropPartition,CreateAlias,DropAlias # Collection level admin privileges
internaltlsEnabled: false
tlsMode: 0
session:
ttl: 30 # ttl value when session granting a lease to register service
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/datacoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
Expand Down Expand Up @@ -71,6 +72,15 @@
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)

if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "Datacoord")
if err != nil {
log.Error("Failed to create cert pool for Datacoord client")
return nil, err
}

Check warning on line 81 in internal/distributed/datacoord/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/datacoord/client/client.go#L79-L81

Added lines #L79 - L81 were not covered by tests
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
8 changes: 6 additions & 2 deletions internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (s *Server) startGrpcLoop() {
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -201,7 +201,11 @@ func (s *Server) startGrpcLoop() {
}),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("DataCoord"))
s.grpcServer = grpc.NewServer(grpcOpts...)
indexpb.RegisterIndexCoordServer(s.grpcServer, s)
datapb.RegisterDataCoordServer(s.grpcServer, s)
// register the streaming coord grpc service.
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/datanode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -72,6 +73,15 @@
client.grpcClient.SetNodeID(serverID)
client.grpcClient.SetSession(sess)

if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "DataNode")
if err != nil {
log.Error("Failed to create cert pool for DataNode client")
return nil, err
}

Check warning on line 82 in internal/distributed/datanode/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/datanode/client/client.go#L80-L82

Added lines #L80 - L82 were not covered by tests
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
8 changes: 6 additions & 2 deletions internal/distributed/datanode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (s *Server) startGrpcLoop() {
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -154,7 +154,11 @@ func (s *Server) startGrpcLoop() {
return s.serverID.Load()
}),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("DataNode"))
s.grpcServer = grpc.NewServer(grpcOpts...)
datapb.RegisterDataNodeServer(s.grpcServer, s)

ctx, cancel := context.WithCancel(s.ctx)
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/indexnode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -72,6 +73,15 @@
if encryption {
client.grpcClient.EnableEncryption()
}
if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "IndexNode")
if err != nil {
log.Error("Failed to create cert pool for IndexNode client")
return nil, err
}

Check warning on line 82 in internal/distributed/indexnode/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/indexnode/client/client.go#L80-L82

Added lines #L80 - L82 were not covered by tests
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
8 changes: 6 additions & 2 deletions internal/distributed/indexnode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *Server) startGrpcLoop() {
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -139,7 +139,11 @@ func (s *Server) startGrpcLoop() {
return s.serverID.Load()
}),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("IndexNode"))
s.grpcServer = grpc.NewServer(grpcOpts...)
workerpb.RegisterIndexNodeServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(s.listener); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/proxy/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -69,6 +70,15 @@
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetNodeID(nodeID)
client.grpcClient.SetSession(sess)
if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "Proxy")
if err != nil {
log.Error("Failed to create cert pool for Proxy client")
return nil, err
}

Check warning on line 79 in internal/distributed/proxy/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/proxy/client/client.go#L77-L79

Added lines #L77 - L79 were not covered by tests
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
9 changes: 7 additions & 2 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (s *Server) startInternalGrpc(errChan chan error) {
}

opts := tracer.GetInterceptorOpts()
s.grpcInternalServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -366,7 +366,12 @@ func (s *Server) startInternalGrpc(errChan chan error) {
}
return s.serverID.Load()
}),
)))
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("Proxy"))
s.grpcInternalServer = grpc.NewServer(grpcOpts...)
proxypb.RegisterProxyServer(s.grpcInternalServer, s)
grpc_health_v1.RegisterHealthServer(s.grpcInternalServer, s)
errChan <- nil
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/querycoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -63,6 +64,15 @@
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)

if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "QueryCoord")
if err != nil {
log.Error("Failed to create cert pool for QueryCoord client")
return nil, err
}

Check warning on line 73 in internal/distributed/querycoord/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/querycoord/client/client.go#L71-L73

Added lines #L71 - L73 were not covered by tests
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
7 changes: 5 additions & 2 deletions internal/distributed/querycoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (s *Server) startGrpcLoop() {
ctx, cancel := context.WithCancel(s.loopCtx)
defer cancel()

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -256,7 +256,10 @@ func (s *Server) startGrpcLoop() {
}),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
)
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("QueryCoord"))
s.grpcServer = grpc.NewServer(grpcOpts...)
querypb.RegisterQueryCoordServer(s.grpcServer, s)

go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
Expand Down
12 changes: 12 additions & 0 deletions internal/distributed/querynode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
Expand All @@ -37,6 +38,8 @@
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var Params *paramtable.ComponentParam = paramtable.Get()

// Client is the grpc client of QueryNode.
type Client struct {
grpcClient grpcclient.GrpcClient[querypb.QueryNodeClient]
Expand Down Expand Up @@ -70,6 +73,15 @@
client.grpcClient.SetNodeID(nodeID)
client.grpcClient.SetSession(sess)

if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "QueryNode")
if err != nil {
log.Error("Failed to create cert pool for QueryNode client")
return nil, err
}

Check warning on line 82 in internal/distributed/querynode/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/querynode/client/client.go#L80-L82

Added lines #L80 - L82 were not covered by tests
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
7 changes: 5 additions & 2 deletions internal/distributed/querynode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *Server) startGrpcLoop() {
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand Down Expand Up @@ -204,7 +204,10 @@ func (s *Server) startGrpcLoop() {
}),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
)
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("QueryNode"))
s.grpcServer = grpc.NewServer(grpcOpts...)
querypb.RegisterQueryNodeServer(s.grpcServer, s)

ctx, cancel := context.WithCancel(s.ctx)
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/rootcoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
Expand Down Expand Up @@ -70,6 +71,15 @@
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)

if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "RootCoord")
if err != nil {
log.Error("Failed to create cert pool for RootCoord client")
return nil, err
}

Check warning on line 80 in internal/distributed/rootcoord/client/client.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/rootcoord/client/client.go#L78-L80

Added lines #L78 - L80 were not covered by tests
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
8 changes: 6 additions & 2 deletions internal/distributed/rootcoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (s *Server) startGrpcLoop() {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -303,7 +303,11 @@ func (s *Server) startGrpcLoop() {
return s.serverID.Load()
}),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("RootCoord"))
s.grpcServer = grpc.NewServer(grpcOpts...)
rootcoordpb.RegisterRootCoordServer(s.grpcServer, s)

go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
Expand Down
Loading
Loading