Skip to content

Commit

Permalink
feat: Added code for Internal-tls (milvus-io#36865)
Browse files Browse the repository at this point in the history
issue : milvus-io#36864

I have a few questions regarding my approach.I will consolidate them
here for feedback and review.Thanks

---------

Signed-off-by: Nischay Yadav <[email protected]>
Signed-off-by: Nischay <[email protected]>
  • Loading branch information
nish112022 authored and JsDove committed Nov 26, 2024
1 parent d42cc2a commit db658c2
Show file tree
Hide file tree
Showing 25 changed files with 643 additions and 21 deletions.
4 changes: 4 additions & 0 deletions cmd/tools/config/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func WriteYaml(w io.Writer) {
name: "tls",
header: "\n# Configure the proxy tls enable.",
},
{
name: "internaltls",
header: "\n# Configure the node-tls enable.",
},
{
name: "common",
},
Expand Down
7 changes: 7 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,12 @@ tls:
serverKeyPath: configs/cert/server.key
caPemPath: configs/cert/ca.pem

# Configure the node-tls enable.
internaltls:
serverPemPath: configs/cert/server.pem
serverKeyPath: configs/cert/server.key
caPemPath: configs/cert/ca.pem

common:
defaultPartitionName: _default # Name of the default partition when a collection is created
defaultIndexName: _default_idx # Name of the index when it is created with name unspecified
Expand Down Expand Up @@ -839,6 +845,7 @@ common:
privileges: Query,Search,IndexDetail,GetFlushState,GetLoadState,GetLoadingProgress,HasPartition,ShowPartitions,DescribeCollection,DescribeAlias,GetStatistics,ListAliases,Load,Release,Insert,Delete,Upsert,Import,Flush,Compaction,LoadBalance,RenameCollection,CreateIndex,DropIndex,CreatePartition,DropPartition # Collection level readwrite privileges
admin:
privileges: Query,Search,IndexDetail,GetFlushState,GetLoadState,GetLoadingProgress,HasPartition,ShowPartitions,DescribeCollection,DescribeAlias,GetStatistics,ListAliases,Load,Release,Insert,Delete,Upsert,Import,Flush,Compaction,LoadBalance,RenameCollection,CreateIndex,DropIndex,CreatePartition,DropPartition,CreateAlias,DropAlias # Collection level admin privileges
internaltlsEnabled: false
tlsMode: 0
session:
ttl: 30 # ttl value when session granting a lease to register service
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/datacoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
Expand Down Expand Up @@ -71,6 +72,15 @@ func NewClient(ctx context.Context) (types.DataCoordClient, error) {
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)

if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "Datacoord")
if err != nil {
log.Error("Failed to create cert pool for Datacoord client")
return nil, err
}
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
8 changes: 6 additions & 2 deletions internal/distributed/datacoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (s *Server) startGrpcLoop() {
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -201,7 +201,11 @@ func (s *Server) startGrpcLoop() {
}),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("DataCoord"))
s.grpcServer = grpc.NewServer(grpcOpts...)
indexpb.RegisterIndexCoordServer(s.grpcServer, s)
datapb.RegisterDataCoordServer(s.grpcServer, s)
// register the streaming coord grpc service.
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/datanode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -72,6 +73,15 @@ func NewClient(ctx context.Context, addr string, serverID int64) (types.DataNode
client.grpcClient.SetNodeID(serverID)
client.grpcClient.SetSession(sess)

if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "DataNode")
if err != nil {
log.Error("Failed to create cert pool for DataNode client")
return nil, err
}
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
8 changes: 6 additions & 2 deletions internal/distributed/datanode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (s *Server) startGrpcLoop() {
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -154,7 +154,11 @@ func (s *Server) startGrpcLoop() {
return s.serverID.Load()
}),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("DataNode"))
s.grpcServer = grpc.NewServer(grpcOpts...)
datapb.RegisterDataNodeServer(s.grpcServer, s)

ctx, cancel := context.WithCancel(s.ctx)
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/indexnode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -72,6 +73,15 @@ func NewClient(ctx context.Context, addr string, nodeID int64, encryption bool)
if encryption {
client.grpcClient.EnableEncryption()
}
if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "IndexNode")
if err != nil {
log.Error("Failed to create cert pool for IndexNode client")
return nil, err
}
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
8 changes: 6 additions & 2 deletions internal/distributed/indexnode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *Server) startGrpcLoop() {
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -139,7 +139,11 @@ func (s *Server) startGrpcLoop() {
return s.serverID.Load()
}),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("IndexNode"))
s.grpcServer = grpc.NewServer(grpcOpts...)
workerpb.RegisterIndexNodeServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(s.listener); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/proxy/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -69,6 +70,15 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (types.ProxyClien
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetNodeID(nodeID)
client.grpcClient.SetSession(sess)
if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "Proxy")
if err != nil {
log.Error("Failed to create cert pool for Proxy client")
return nil, err
}
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
9 changes: 7 additions & 2 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (s *Server) startInternalGrpc(errChan chan error) {
}

opts := tracer.GetInterceptorOpts()
s.grpcInternalServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -366,7 +366,12 @@ func (s *Server) startInternalGrpc(errChan chan error) {
}
return s.serverID.Load()
}),
)))
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("Proxy"))
s.grpcInternalServer = grpc.NewServer(grpcOpts...)
proxypb.RegisterProxyServer(s.grpcInternalServer, s)
grpc_health_v1.RegisterHealthServer(s.grpcInternalServer, s)
errChan <- nil
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/querycoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -63,6 +64,15 @@ func NewClient(ctx context.Context) (types.QueryCoordClient, error) {
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)

if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "QueryCoord")
if err != nil {
log.Error("Failed to create cert pool for QueryCoord client")
return nil, err
}
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
7 changes: 5 additions & 2 deletions internal/distributed/querycoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (s *Server) startGrpcLoop() {
ctx, cancel := context.WithCancel(s.loopCtx)
defer cancel()

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -256,7 +256,10 @@ func (s *Server) startGrpcLoop() {
}),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
)
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("QueryCoord"))
s.grpcServer = grpc.NewServer(grpcOpts...)
querypb.RegisterQueryCoordServer(s.grpcServer, s)

go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
Expand Down
12 changes: 12 additions & 0 deletions internal/distributed/querynode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
Expand All @@ -37,6 +38,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var Params *paramtable.ComponentParam = paramtable.Get()

// Client is the grpc client of QueryNode.
type Client struct {
grpcClient grpcclient.GrpcClient[querypb.QueryNodeClient]
Expand Down Expand Up @@ -70,6 +73,15 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (types.QueryNodeC
client.grpcClient.SetNodeID(nodeID)
client.grpcClient.SetSession(sess)

if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "QueryNode")
if err != nil {
log.Error("Failed to create cert pool for QueryNode client")
return nil, err
}
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
7 changes: 5 additions & 2 deletions internal/distributed/querynode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *Server) startGrpcLoop() {
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand Down Expand Up @@ -204,7 +204,10 @@ func (s *Server) startGrpcLoop() {
}),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
)
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("QueryNode"))
s.grpcServer = grpc.NewServer(grpcOpts...)
querypb.RegisterQueryNodeServer(s.grpcServer, s)

ctx, cancel := context.WithCancel(s.ctx)
Expand Down
10 changes: 10 additions & 0 deletions internal/distributed/rootcoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/distributed/utils"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
Expand Down Expand Up @@ -70,6 +71,15 @@ func NewClient(ctx context.Context) (types.RootCoordClient, error) {
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
client.grpcClient.SetSession(sess)

if Params.InternalTLSCfg.InternalTLSEnabled.GetAsBool() {
client.grpcClient.EnableEncryption()
cp, err := utils.CreateCertPoolforClient(Params.InternalTLSCfg.InternalTLSCaPemPath.GetValue(), "RootCoord")
if err != nil {
log.Error("Failed to create cert pool for RootCoord client")
return nil, err
}
client.grpcClient.SetInternalTLSCertPool(cp)
}
return client, nil
}

Expand Down
8 changes: 6 additions & 2 deletions internal/distributed/rootcoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (s *Server) startGrpcLoop() {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()

s.grpcServer = grpc.NewServer(
grpcOpts := []grpc.ServerOption{
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
Expand All @@ -303,7 +303,11 @@ func (s *Server) startGrpcLoop() {
return s.serverID.Load()
}),
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}

grpcOpts = append(grpcOpts, utils.EnableInternalTLS("RootCoord"))
s.grpcServer = grpc.NewServer(grpcOpts...)
rootcoordpb.RegisterRootCoordServer(s.grpcServer, s)

go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
Expand Down
Loading

0 comments on commit db658c2

Please sign in to comment.