Skip to content

Commit

Permalink
feat: grpc tls (#5001)
Browse files Browse the repository at this point in the history
* feat: add client tls cert for log server

* fix: change config file

* fix: unit test

* feat: grpc server tls

* fix: unit test

* fix: switch to mounted secrets

* fix: error message

* fix: don't send nil creds

* fix: non init structure
  • Loading branch information
vsukhin authored Feb 14, 2024
1 parent b6eb2e6 commit 74e2526
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 10 deletions.
15 changes: 14 additions & 1 deletion cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"go.mongodb.org/mongo-driver/mongo"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

cloudartifacts "github.com/kubeshop/testkube/pkg/cloud/data/artifact"

Expand Down Expand Up @@ -231,7 +232,9 @@ func main() {

var logGrpcClient logsclient.StreamGetter
if features.LogsV2 {
logGrpcClient = logsclient.NewGrpcClient(cfg.LogServerGrpcAddress)
creds, err := newGRPCTransportCredentials(cfg)
ui.ExitOnError("Getting log server TLS credentials", err)
logGrpcClient = logsclient.NewGrpcClient(cfg.LogServerGrpcAddress, creds)
}

// DI
Expand Down Expand Up @@ -756,3 +759,13 @@ func getMongoSSLConfig(cfg *config.Config, secretClient *secret.Client) *storage
SSLCertificateAuthoritiyFile: rootCAPath,
}
}

func newGRPCTransportCredentials(cfg *config.Config) (credentials.TransportCredentials, error) {
return logsclient.GetGrpcTransportCredentials(logsclient.GrpcConnectionConfig{
Secure: cfg.LogServerSecure,
SkipVerify: cfg.LogServerSkipVerify,
CertFile: cfg.LogServerCertFile,
KeyFile: cfg.LogServerKeyFile,
CAFile: cfg.LogServerCAFile,
})
}
19 changes: 18 additions & 1 deletion cmd/logs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/nats-io/nats.go/jetstream"
"github.com/oklog/run"
"go.uber.org/zap"
"google.golang.org/grpc/credentials"

"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/agent"
Expand Down Expand Up @@ -92,6 +93,12 @@ func main() {
if cfg.Debug {
svc.AddAdapter(adapter.NewDebugAdapter())
}

creds, err := newGRPCTransportCredentials(cfg)
if err != nil {
log.Fatalw("error getting tls credentials", "error", err)
}

// add given log adapter depends from mode
switch mode {

Expand Down Expand Up @@ -142,7 +149,7 @@ func main() {
})

g.Add(func() error {
return svc.RunGRPCServer(ctx)
return svc.RunGRPCServer(ctx, creds)
}, func(error) {
cancel()
})
Expand Down Expand Up @@ -179,3 +186,13 @@ func Must[T any](val T, err error) T {
}
return val
}

func newGRPCTransportCredentials(cfg *config.Config) (credentials.TransportCredentials, error) {
return logs.GetGrpcTransportCredentials(logs.GrpcConnectionConfig{
Secure: cfg.GrpcSecure,
ClientAuth: cfg.GrpcClientAuth,
CertFile: cfg.GrpcCertFile,
KeyFile: cfg.GrpcKeyFile,
ClientCAFile: cfg.GrpcClientCAFile,
})
}
7 changes: 6 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,14 @@ type Config struct {
EnableSecretsEndpoint bool `envconfig:"ENABLE_SECRETS_ENDPOINT" default:"false"`
DisableMongoMigrations bool `envconfig:"DISABLE_MONGO_MIGRATIONS" default:"false"`
Debug bool `envconfig:"DEBUG" default:"false"`
LogServerGrpcAddress string `envconfig:"LOG_SERVER_GRPC_ADDRESS" default:":9090"`
EnableImageDataPersistentCache bool `envconfig:"ENABLE_IMAGE_DATA_PERSISTENT_CACHE" default:"false"`
ImageDataPersistentCacheKey string `envconfig:"IMAGE_DATA_PERSISTENT_CACHE_KEY" default:"testkube-image-cache"`
LogServerGrpcAddress string `envconfig:"LOG_SERVER_GRPC_ADDRESS" default:":9090"`
LogServerSecure bool `envconfig:"LOG_SERVER_SECURE" default:"false"`
LogServerSkipVerify bool `envconfig:"LOG_SERVER_SKIP_VERIFY" default:"false"`
LogServerCertFile string `envconfig:"LOG_SERVER_CERT_FILE" default:""`
LogServerKeyFile string `envconfig:"LOG_SERVER_KEY_FILE" default:""`
LogServerCAFile string `envconfig:"LOG_SERVER_CA_FILE" default:""`

// DEPRECATED: Use TestkubeProAPIKey instead
TestkubeCloudAPIKey string `envconfig:"TESTKUBE_CLOUD_API_KEY" default:""`
Expand Down
65 changes: 63 additions & 2 deletions pkg/logs/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package client

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"os"
"time"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/kubeshop/testkube/pkg/log"
Expand All @@ -20,15 +25,17 @@ const (
)

// NewGrpcClient imlpements getter interface for log stream for given ID
func NewGrpcClient(address string) StreamGetter {
func NewGrpcClient(address string, creds credentials.TransportCredentials) StreamGetter {
return &GrpcClient{
log: log.DefaultLogger.With("service", "logs-grpc-client"),
creds: creds,
address: address,
}
}

type GrpcClient struct {
log *zap.SugaredLogger
creds credentials.TransportCredentials
address string
}

Expand All @@ -47,7 +54,12 @@ func (c GrpcClient) Get(ctx context.Context, id string) (chan events.LogResponse
defer close(ch)

// TODO add TLS to GRPC client
conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
creds := insecure.NewCredentials()
if c.creds != nil {
creds = c.creds
}

conn, err := grpc.Dial(c.address, grpc.WithTransportCredentials(creds))
if err != nil {
ch <- events.LogResponse{Error: err}
return
Expand Down Expand Up @@ -96,3 +108,52 @@ func (c GrpcClient) Get(ctx context.Context, id string) (chan events.LogResponse

return ch, nil
}

// GrpcConnectionConfig contains GRPC connection parameters
type GrpcConnectionConfig struct {
Secure bool
SkipVerify bool
CertFile string
KeyFile string
CAFile string
}

// GetGrpcTransportCredentials returns transport credentials for GRPC connection config
func GetGrpcTransportCredentials(cfg GrpcConnectionConfig) (credentials.TransportCredentials, error) {
var creds credentials.TransportCredentials

if cfg.Secure {
var tlsConfig tls.Config

if cfg.SkipVerify {
tlsConfig.InsecureSkipVerify = true
} else {
if cfg.CertFile != "" && cfg.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, err
}

tlsConfig.Certificates = []tls.Certificate{cert}
}

if cfg.CAFile != "" {
caCertificate, err := os.ReadFile(cfg.CAFile)
if err != nil {
return nil, err
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCertificate) {
return nil, fmt.Errorf("failed to add server CA's certificate")
}

tlsConfig.RootCAs = certPool
}
}

creds = credentials.NewTLS(&tlsConfig)
}

return creds, nil
}
6 changes: 6 additions & 0 deletions pkg/logs/config/logs_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type Config struct {
GrpcAddress string `envconfig:"GRPC_ADDRESS" default:":9090"`
KVBucketName string `envconfig:"KV_BUCKET_NAME" default:"logsState"`

GrpcSecure bool `envconfig:"GRPC_SECURE" default:"false"`
GrpcClientAuth bool `envconfig:"GRPC_CLIENT_AUTH" default:"false"`
GrpcCertFile string `envconfig:"GRPC_CERT_FILE" default:""`
GrpcKeyFile string `envconfig:"GRPC_KEY_FILE" default:""`
GrpcClientCAFile string `envconfig:"GRPC_CLIENT_CA_FILE" default:""`

StorageEndpoint string `envconfig:"STORAGE_ENDPOINT" default:"localhost:9000"`
StorageBucket string `envconfig:"STORAGE_BUCKET" default:"testkube-logs"`
StorageExpiration int `envconfig:"STORAGE_EXPIRATION"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestLogsService_RunHealthcheckHandler(t *testing.T) {
svc := LogsService{log: log.DefaultLogger}
svc.WithRandomPort()
go svc.RunHealthCheckHandler(ctx)
go svc.RunGRPCServer(ctx)
go svc.RunGRPCServer(ctx, nil)
defer svc.Shutdown(ctx)

time.Sleep(100 * time.Millisecond)
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/logsserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func TestGRPC_Server(t *testing.T) {
WithLogsRepositoryFactory(LogsFactoryMock{}).
WithRandomPort()

go ls.RunGRPCServer(ctx)
go ls.RunGRPCServer(ctx, nil)

// allow server to splin up
time.Sleep(time.Millisecond * 100)

expectedCount := 0

stream := client.NewGrpcClient(ls.grpcAddress)
stream := client.NewGrpcClient(ls.grpcAddress, nil)
ch, err := stream.Get(ctx, "id1")
assert.NoError(t, err)

Expand Down
63 changes: 61 additions & 2 deletions pkg/logs/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ package logs

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"sync"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/logs/adapter"
Expand Down Expand Up @@ -121,13 +125,19 @@ func (ls *LogsService) Run(ctx context.Context) (err error) {
}

// TODO handle TLS
func (ls *LogsService) RunGRPCServer(ctx context.Context) error {
func (ls *LogsService) RunGRPCServer(ctx context.Context, creds credentials.TransportCredentials) error {
lis, err := net.Listen("tcp", ls.grpcAddress)
if err != nil {
return err
}

ls.grpcServer = grpc.NewServer()
var opts []grpc.ServerOption
if creds != nil {
opts = append(opts, grpc.Creds(creds))
}

ls.grpcServer = grpc.NewServer(opts...)

pb.RegisterLogsServiceServer(ls.grpcServer, NewLogsServer(ls.logsRepositoryFactory, ls.state))

ls.log.Infow("starting grpc server", "address", ls.grpcAddress)
Expand Down Expand Up @@ -176,3 +186,52 @@ func (ls *LogsService) WithLogsRepositoryFactory(f repository.Factory) *LogsServ
ls.logsRepositoryFactory = f
return ls
}

// GrpcConnectionConfig contains GRPC connection parameters
type GrpcConnectionConfig struct {
Secure bool
ClientAuth bool
CertFile string
KeyFile string
ClientCAFile string
}

// GetGrpcTransportCredentials returns transport credentials for GRPC connection config
func GetGrpcTransportCredentials(cfg GrpcConnectionConfig) (credentials.TransportCredentials, error) {
var creds credentials.TransportCredentials

if cfg.Secure {
var tlsConfig tls.Config
tlsConfig.ClientAuth = tls.NoClientCert
if cfg.ClientAuth {
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}

if cfg.CertFile != "" && cfg.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, err
}

tlsConfig.Certificates = []tls.Certificate{cert}
}

if cfg.ClientCAFile != "" {
caCertificate, err := os.ReadFile(cfg.ClientCAFile)
if err != nil {
return nil, err
}

certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCertificate) {
return nil, fmt.Errorf("failed to add client CA's certificate")
}

tlsConfig.ClientCAs = certPool
}

creds = credentials.NewTLS(&tlsConfig)
}

return creds, nil
}

0 comments on commit 74e2526

Please sign in to comment.