diff --git a/README.md b/README.md index a04ec3e..f803044 100644 --- a/README.md +++ b/README.md @@ -589,6 +589,88 @@ chrome: image: selenoid/vnc:chrome_86.0 ``` +## Custom UID and GID for browser pod +Browser pod can be run with custom UID and GID. To do so set runAs property for specific browser globally or per each browser version. +``` json +{ + "chrome": { + "defaultVersion": "85.0", + "path": "/", + "runAs": { + "uid": 1000, + "gid": 2000 + }, + "versions": { + "85.0": { + "image": "selenoid/vnc:chrome_85.0" + }, + "runAs": { + "uid": 1001, + "gid": 2002 + }, + "86.0": { + "image": "selenoid/vnc:chrome_86.0" + } + } + } +} +``` + +``` yaml +--- +chrome: + defaultVersion: '85.0' + path: "/" + runAs: + uid: 1000 + gid: 2000 + versions: + '85.0': + image: selenoid/vnc:chrome_85.0 + runAs: + uid: 1001 + gid: 2002 + '86.0': + image: selenoid/vnc:chrome_86.0 +``` + +## Custom Kernel Capabilities +In some cases you may need to run browser container with custom Linux capabilities. To do so set kernelCaps property for specific browser globally or per each browser version. +``` json +{ + "chrome": { + "defaultVersion": "85.0", + "path": "/", + "kernelCaps": ["SYS_ADMIN"], + "versions": { + "85.0": { + "image": "selenoid/vnc:chrome_85.0" + }, + "kernelCaps": ["SYS_ADMIN"], + "86.0": { + "image": "selenoid/vnc:chrome_86.0" + } + } + } +} +``` + +``` yaml +--- +chrome: + defaultVersion: '85.0' + path: "/" + kernelCaps: + - SYS_ADMIN + versions: + '85.0': + image: selenoid/vnc:chrome_85.0 + kernelCaps: + - SYS_ADMIN + '86.0': + image: selenoid/vnc:chrome_86.0 +``` + ## Deployment Files and steps required for selenosis deployment available in [selenosis-deploy](https://github.com/alcounit/selenosis-deploy) repository diff --git a/cmd/selenosis/main.go b/cmd/selenosis/main.go index aae4be2..0a9700e 100644 --- a/cmd/selenosis/main.go +++ b/cmd/selenosis/main.go @@ -91,7 +91,7 @@ func command() *cobra.Command { }) router := mux.NewRouter() - router.HandleFunc("/wd/hub/session", app.CheckLimit(app.HandleSession)).Methods(http.MethodPost) + router.HandleFunc("/wd/hub/session", app.HandleSession).Methods(http.MethodPost) router.PathPrefix("/wd/hub/session/{sessionId}").HandlerFunc(app.HandleProxy) router.HandleFunc("/wd/hub/status", app.HandleHubStatus).Methods(http.MethodGet) router.PathPrefix("/vnc/{sessionId}").Handler(websocket.Handler(app.HandleVNC())) diff --git a/config/config.go b/config/config.go index bb8582a..3b3aeae 100644 --- a/config/config.go +++ b/config/config.go @@ -23,6 +23,8 @@ type Layout struct { DefaultVersion string `yaml:"defaultVersion" json:"defaultVersion"` Versions map[string]*platform.BrowserSpec `yaml:"versions" json:"versions"` Volumes []apiv1.Volume `yaml:"volumes,omitempty" json:"volumes,omitempty"` + Capabilities []apiv1.Capability `yaml:"kernelCaps,omitempty" json:"kernelCaps,omitempty"` + RunAs platform.RunAsOptions `yaml:"runAs,omitempty" json:"runAs,omitempty"` } //BrowsersConfig ... @@ -134,10 +136,15 @@ func readConfig(configFile string) (map[string]*Layout, error) { container.Meta.Annotations = merge(container.Meta.Annotations, layout.Meta.Annotations) container.Meta.Labels = merge(container.Meta.Labels, layout.Meta.Labels) container.Volumes = layout.Volumes + container.Capabilities = append(container.Capabilities, layout.Capabilities...) if err := mergo.Merge(&container.Spec, spec); err != nil { return nil, fmt.Errorf("merge error %v", err) } + + if err := mergo.Merge(&container.RunAs, layout.RunAs); err != nil { + return nil, fmt.Errorf("merge error %v", err) + } } } return layouts, nil diff --git a/handlers.go b/handlers.go index fc82b53..6bdefc7 100644 --- a/handlers.go +++ b/handlers.go @@ -32,26 +32,6 @@ var ( } ) -//CheckLimit ... -func (app *App) CheckLimit(next http.HandlerFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - logger := app.logger.WithFields(logrus.Fields{ - "request_id": uuid.New(), - "request": fmt.Sprintf("%s %s", r.Method, r.URL.Path), - }) - - total := app.stats.Len() - - if total >= app.sessionLimit { - logger.Warnf("active session limit reached: total %d, limit %d", total, app.sessionLimit) - tools.JSONError(w, "session limit reached", http.StatusInternalServerError) - return - } - - next.ServeHTTP(w, r) - } -} - //HandleSession ... func (app *App) HandleSession(w http.ResponseWriter, r *http.Request) { start := time.Now() @@ -122,7 +102,7 @@ func (app *App) HandleSession(w http.ResponseWriter, r *http.Request) { logger.WithField("time_elapsed", tools.TimeElapsed(start)).Infof("starting browser from image: %s", template.Template.Image) - service, err := app.client.Create(template) + service, err := app.client.Service().Create(template) if err != nil { logger.WithField("time_elapsed", tools.TimeElapsed(start)).Errorf("failed to start browser: %v", err) tools.JSONError(w, err.Error(), http.StatusBadRequest) @@ -238,7 +218,7 @@ func (app *App) HandleHubStatus(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - active, pending := getSessionStats(app.stats.List()) + active, pending := getSessionStats(app.stats.Sessions().List()) total := len(active) + len(pending) json.NewEncoder(w).Encode( @@ -341,7 +321,7 @@ func (app *App) HandleLogs() websocket.Handler { }) logger.Infof("stream logs request: %s", fmt.Sprintf("%s.%s", sessionID, app.serviceName)) - conn, err := app.client.Logs(wsconn.Request().Context(), sessionID) + conn, err := app.client.Service().Logs(wsconn.Request().Context(), sessionID) if err != nil { logger.Errorf("stream logs error: %v", err) return @@ -378,7 +358,7 @@ func (app *App) HandleStatus(w http.ResponseWriter, r *http.Request) { Selenosis Status `json:"selenosis,omitempty"` } - active, pending := getSessionStats(app.stats.List()) + active, pending := getSessionStats(app.stats.Sessions().List()) json.NewEncoder(w).Encode( Response{ Status: http.StatusOK, diff --git a/handlers_test.go b/handlers_test.go index 45be399..47956dd 100644 --- a/handlers_test.go +++ b/handlers_test.go @@ -595,30 +595,74 @@ func NewPlatformMock(f *PlatformMock) platform.Platform { return f } -func (p *PlatformMock) Create(*platform.ServiceSpec) (*platform.Service, error) { +func (p *PlatformMock) Service() platform.ServiceInterface { + return &serviceMock{ + err: p.err, + service: p.service, + } +} + +func (p *PlatformMock) Quota() platform.QuotaInterface { + return "aMock{ + err: nil, + quota: &platform.Quota{ + Name: "test", + CurrentMaxLimit: 10, + }, + } +} + +func (p *PlatformMock) State() (platform.PlatformState, error) { + return platform.PlatformState{}, nil +} + +func (p *PlatformMock) Watch() <-chan platform.Event { + ch := make(chan platform.Event) + return ch +} + +func (p *PlatformMock) List() ([]*platform.Service, error) { + return nil, nil +} + +type serviceMock struct { + err error + service *platform.Service +} + +func (p *serviceMock) Create(*platform.ServiceSpec) (*platform.Service, error) { if p.err != nil { return nil, p.err } return p.service, nil } -func (p *PlatformMock) Delete(string) error { +func (p *serviceMock) Delete(string) error { if p.err != nil { return p.err } return nil } -func (p *PlatformMock) List() ([]*platform.Service, error) { + +func (p *serviceMock) Logs(ctx context.Context, name string) (io.ReadCloser, error) { return nil, nil } -func (p *PlatformMock) Watch() <-chan platform.Event { - ch := make(chan platform.Event) - return ch +type quotaMock struct { + err error + quota *platform.Quota } -func (p *PlatformMock) Logs(ctx context.Context, name string) (io.ReadCloser, error) { - return nil, nil +func (s *quotaMock) Create(int64) (*platform.Quota, error) { + return s.quota, nil +} + +func (s *quotaMock) Get() (*platform.Quota, error) { + return s.quota, nil +} + +func (s *quotaMock) Update(int64) (*platform.Quota, error) { + return s.quota, nil } type errReader int diff --git a/platform/kubernetes.go b/platform/kubernetes.go index 3ec46ce..fbbb651 100644 --- a/platform/kubernetes.go +++ b/platform/kubernetes.go @@ -9,10 +9,12 @@ import ( "net/http" "net/url" "path" + "strconv" "time" "github.com/alcounit/selenosis/tools" apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/watch" @@ -25,6 +27,8 @@ import ( ) var ( + label = "selenosis.app.type" + quotaName = "selenosis-pod-limit" browserPorts = struct { selenium, vnc intstr.IntOrString }{ @@ -43,12 +47,12 @@ var ( timeZone: "TZ", } defaultLabels = struct { - serviceType, session string + serviceType, appType, session string }{ serviceType: "type", + appType: label, session: "session", } - label = "type=browser" ) //ClientConfig ... @@ -64,14 +68,12 @@ type ClientConfig struct { //Client ... type Client struct { - ns string - svc string - svcPort intstr.IntOrString - imagePullSecretName string - proxyImage string - readinessTimeout time.Duration - idleTimeout time.Duration - clientset kubernetes.Interface + ns string + svc string + svcPort intstr.IntOrString + clientset kubernetes.Interface + service ServiceInterface + quota QuotaInterface } //NewClient ... @@ -87,7 +89,7 @@ func NewClient(c ClientConfig) (Platform, error) { return nil, fmt.Errorf("failed to build client: %v", err) } - return &Client{ + service := &service{ ns: c.Namespace, clientset: clientset, svc: c.Service, @@ -96,32 +98,215 @@ func NewClient(c ClientConfig) (Platform, error) { proxyImage: c.ProxyImage, readinessTimeout: c.ReadinessTimeout, idleTimeout: c.IdleTimeout, + } + + quota := "a{ + ns: c.Namespace, + clientset: clientset, + } + + return &Client{ + ns: c.Namespace, + clientset: clientset, + svc: c.Service, + svcPort: intstr.FromString(c.ServicePort), + service: service, + quota: quota, }, nil } -//NewDefaultClient ... -func NewDefaultClient(namespace string) (Platform, error) { +func (cl *Client) Service() ServiceInterface { + return cl.service +} + +func (cl *Client) Quota() QuotaInterface { + return cl.quota +} + +//List ... +func (cl *Client) State() (PlatformState, error) { + context := context.Background() + pods, err := cl.clientset.CoreV1().Pods(cl.ns).List(context, metav1.ListOptions{}) - conf, err := rest.InClusterConfig() if err != nil { - return nil, fmt.Errorf("failed to build cluster config: %v", err) + return PlatformState{}, fmt.Errorf("failed to get pods: %v", err) } - clientset, err := kubernetes.NewForConfig(conf) - if err != nil { - return nil, fmt.Errorf("failed to build client: %v", err) + var services []*Service + var workers []*Worker + + for _, pod := range pods.Items { + podName := pod.GetName() + creationTime := pod.CreationTimestamp.Time + + var status ServiceStatus + switch pod.Status.Phase { + case apiv1.PodRunning: + status = Running + case apiv1.PodPending: + status = Pending + default: + status = Unknown + } + + if application, ok := pod.GetLabels()[label]; ok { + switch application { + case "worker": + worker := &Worker{ + Name: podName, + Labels: pod.Labels, + Status: status, + Started: creationTime, + } + workers = append(workers, worker) + + case "browser": + service := &Service{ + SessionID: podName, + URL: &url.URL{ + Scheme: "http", + Host: tools.BuildHostPort(podName, cl.svc, cl.svcPort.StrVal), + }, + Labels: getRequestedCapabilities(pod.GetAnnotations()), + CancelFunc: func() { + deletePod(cl.clientset, cl.ns, podName) + }, + Status: status, + Started: creationTime, + } + + services = append(services, service) + } + } } - return &Client{ - ns: namespace, - clientset: clientset, + return PlatformState{ + Services: services, + Workers: workers, }, nil } +//Watch ... +func (cl *Client) Watch() <-chan Event { + ch := make(chan Event) + namespace := informers.WithNamespace(cl.ns) + labels := informers.WithTweakListOptions(func(list *metav1.ListOptions) { + list.LabelSelector = label + }) + + sharedIformer := informers.NewSharedInformerFactoryWithOptions(cl.clientset, 30*time.Second, namespace, labels) + + podEventFunc := func(obj interface{}, eventType EventType) { + if pod, ok := obj.(*apiv1.Pod); ok { + if application, ok := pod.GetLabels()[label]; ok { + podName := pod.GetName() + creationTime := pod.CreationTimestamp.Time + + var status ServiceStatus + switch pod.Status.Phase { + case apiv1.PodRunning: + status = Running + case apiv1.PodPending: + status = Pending + default: + status = Unknown + } + + switch application { + case "worker": + ch <- Event{ + Type: eventType, + PlatformObject: &Worker{ + Name: podName, + Labels: pod.Labels, + Status: status, + Started: creationTime, + }, + } + + case "browser": + ch <- Event{ + Type: eventType, + PlatformObject: &Service{ + SessionID: podName, + URL: &url.URL{ + Scheme: "http", + Host: tools.BuildHostPort(podName, cl.svc, cl.svcPort.StrVal), + }, + Labels: getRequestedCapabilities(pod.GetAnnotations()), + CancelFunc: func() { + deletePod(cl.clientset, cl.ns, podName) + }, + Status: status, + Started: creationTime, + }, + } + } + } + } + } + sharedIformer.Core().V1().Pods().Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + podEventFunc(obj, Added) + }, + UpdateFunc: func(old interface{}, new interface{}) { + podEventFunc(new, Updated) + }, + DeleteFunc: func(obj interface{}) { + podEventFunc(obj, Deleted) + }, + }, + ) + + quotaEventFunc := func(obj interface{}, eventType EventType) { + if rq, ok := obj.(*apiv1.ResourceQuota); ok { + if _, ok := rq.GetLabels()[label]; ok { + rqName := rq.GetName() + ch <- Event{ + Type: eventType, + PlatformObject: &Quota{ + Name: rqName, + CurrentMaxLimit: rq.Spec.Hard.Pods().Value(), + }, + } + } + } + } + sharedIformer.Core().V1().ResourceQuotas().Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + quotaEventFunc(obj, Added) + }, + UpdateFunc: func(old interface{}, new interface{}) { + quotaEventFunc(new, Updated) + }, + DeleteFunc: func(obj interface{}) { + quotaEventFunc(obj, Deleted) + }, + }, + ) + + var neverStop <-chan struct{} = make(chan struct{}) + sharedIformer.Start(neverStop) + return ch +} + +type service struct { + ns string + svc string + svcPort intstr.IntOrString + imagePullSecretName string + proxyImage string + readinessTimeout time.Duration + idleTimeout time.Duration + clientset kubernetes.Interface +} + //Create ... -func (cl *Client) Create(layout *ServiceSpec) (*Service, error) { +func (cl *service) Create(layout *ServiceSpec) (*Service, error) { annontations := map[string]string{ defaultsAnnotations.browserName: layout.Template.BrowserName, defaultsAnnotations.browserVersion: layout.Template.BrowserVersion, @@ -130,6 +315,7 @@ func (cl *Client) Create(layout *ServiceSpec) (*Service, error) { labels := map[string]string{ defaultLabels.serviceType: "browser", + defaultLabels.appType: "browser", defaultLabels.session: layout.SessionID, } @@ -217,12 +403,8 @@ func (cl *Client) Create(layout *ServiceSpec) (*Service, error) { Name: "browser", Image: layout.Template.Image, SecurityContext: &apiv1.SecurityContext{ - Privileged: &layout.Template.Privileged, - Capabilities: &apiv1.Capabilities{ - Add: []apiv1.Capability{ - "SYS_ADMIN", - }, - }, + Privileged: layout.Template.Privileged, + Capabilities: getCapabilities(layout.Template.Capabilities), }, Env: layout.Template.Spec.EnvVars, Ports: getBrowserPorts(), @@ -244,10 +426,11 @@ func (cl *Client) Create(layout *ServiceSpec) (*Service, error) { NodeSelector: layout.Template.Spec.NodeSelector, HostAliases: layout.Template.Spec.HostAliases, RestartPolicy: apiv1.RestartPolicyNever, - Affinity: &layout.Template.Spec.Affinity, - DNSConfig: &layout.Template.Spec.DNSConfig, + Affinity: layout.Template.Spec.Affinity, + DNSConfig: layout.Template.Spec.DNSConfig, Tolerations: layout.Template.Spec.Tolerations, ImagePullSecrets: getImagePullSecretList(cl.imagePullSecretName), + SecurityContext: getSecurityContext(layout.Template.RunAs), }, } @@ -337,136 +520,98 @@ func (cl *Client) Create(layout *ServiceSpec) (*Service, error) { } //Delete ... -func (cl *Client) Delete(name string) error { - context := context.Background() +func (cl *service) Delete(name string) error { + return deletePod(cl.clientset, cl.ns, name) +} - return cl.clientset.CoreV1().Pods(cl.ns).Delete(context, name, metav1.DeleteOptions{ - GracePeriodSeconds: pointer.Int64Ptr(15), +//Logs ... +func (cl *service) Logs(ctx context.Context, name string) (io.ReadCloser, error) { + req := cl.clientset.CoreV1().Pods(cl.ns).GetLogs(name, &apiv1.PodLogOptions{ + Container: "browser", + Follow: true, + Previous: false, + Timestamps: false, }) + return req.Stream(ctx) } -//List ... -func (cl *Client) List() ([]*Service, error) { - context := context.Background() - pods, err := cl.clientset.CoreV1().Pods(cl.ns).List(context, metav1.ListOptions{ - LabelSelector: label, - }) +type quota struct { + ns string + clientset kubernetes.Interface +} +//Create ... +func (cl quota) Create(limit int64) (*Quota, error) { + context := context.Background() + quantity, err := resource.ParseQuantity(strconv.FormatInt(limit, 10)) if err != nil { - return nil, fmt.Errorf("failed to get pods: %v", err) + return nil, fmt.Errorf("failed to parse limit amount") } - - var services []*Service - - for _, pod := range pods.Items { - podName := pod.GetName() - - var status ServiceStatus - switch pod.Status.Phase { - case apiv1.PodRunning: - status = Running - case apiv1.PodPending: - status = Pending - default: - status = Unknown - } - - service := &Service{ - SessionID: podName, - URL: &url.URL{ - Scheme: "http", - Host: tools.BuildHostPort(podName, cl.svc, cl.svcPort.StrVal), - }, - Labels: getRequestedCapabilities(pod.GetAnnotations()), - CancelFunc: func() { - cl.Delete(podName) - }, - Status: status, - Started: pod.CreationTimestamp.Time, - } - services = append(services, service) + quota := &apiv1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: quotaName, + Labels: map[string]string{label: "quota"}, + }, + Spec: apiv1.ResourceQuotaSpec{ + Hard: map[apiv1.ResourceName]resource.Quantity{apiv1.ResourcePods: quantity}, + }, } - - return services, nil - + quota, err = cl.clientset.CoreV1().ResourceQuotas(cl.ns).Create(context, quota, metav1.CreateOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to create resourceQuota") + } + return &Quota{ + Name: quota.GetName(), + CurrentMaxLimit: quota.Spec.Hard.Pods().Value(), + }, nil } -//Watch ... -func (cl Client) Watch() <-chan Event { - ch := make(chan Event) - - convert := func(obj interface{}) *Service { - pod := obj.(*apiv1.Pod) - podName := pod.GetName() - - var status ServiceStatus - switch pod.Status.Phase { - case apiv1.PodRunning: - status = Running - case apiv1.PodPending: - status = Pending - default: - status = Unknown - } - - return &Service{ - SessionID: podName, - URL: &url.URL{ - Scheme: "http", - Host: tools.BuildHostPort(podName, cl.svc, cl.svcPort.StrVal), - }, - Labels: getRequestedCapabilities(pod.GetAnnotations()), - CancelFunc: func() { - cl.Delete(podName) - }, - Status: status, - Started: pod.CreationTimestamp.Time, - } +func (cl quota) Get() (*Quota, error) { + context := context.Background() + quota, err := cl.clientset.CoreV1().ResourceQuotas(cl.ns).Get(context, quotaName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("quota not found") } - namespace := informers.WithNamespace(cl.ns) - labels := informers.WithTweakListOptions(func(list *metav1.ListOptions) { - list.LabelSelector = label - }) + return &Quota{ + Name: quota.GetName(), + CurrentMaxLimit: quota.Spec.Hard.Pods().Value(), + }, nil +} - sharedIformer := informers.NewSharedInformerFactoryWithOptions(cl.clientset, 30*time.Second, namespace, labels) - sharedIformer.Core().V1().Pods().Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - ch <- Event{ - Type: Added, - Service: convert(obj), - } - }, - UpdateFunc: func(old interface{}, new interface{}) { - ch <- Event{ - Type: Updated, - Service: convert(new), - } - }, - DeleteFunc: func(obj interface{}) { - ch <- Event{ - Type: Deleted, - Service: convert(obj), - } - }, +//Update ... +func (cl quota) Update(limit int64) (*Quota, error) { + context := context.Background() + quantity, err := resource.ParseQuantity(strconv.FormatInt(limit, 10)) + if err != nil { + return nil, fmt.Errorf("failed to parse limit amount") + } + rq := &apiv1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: quotaName, + Labels: map[string]string{label: "quota"}, }, - ) + Spec: apiv1.ResourceQuotaSpec{ + Hard: map[apiv1.ResourceName]resource.Quantity{apiv1.ResourcePods: quantity}, + }, + } + quota, err := cl.clientset.CoreV1().ResourceQuotas(cl.ns).Update(context, rq, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("resourse quota update error: %v", err) + } - var neverStop <-chan struct{} = make(chan struct{}) - sharedIformer.Start(neverStop) - return ch + return &Quota{ + Name: quota.GetName(), + CurrentMaxLimit: rq.Spec.Hard.Pods().Value(), + }, err } -//Logs ... -func (cl *Client) Logs(ctx context.Context, name string) (io.ReadCloser, error) { - req := cl.clientset.CoreV1().Pods(cl.ns).GetLogs(name, &apiv1.PodLogOptions{ - Container: "browser", - Follow: true, - Previous: false, - Timestamps: false, +func deletePod(clientset kubernetes.Interface, namespace, name string) error { + context := context.Background() + + return clientset.CoreV1().Pods(namespace).Delete(context, name, metav1.DeleteOptions{ + GracePeriodSeconds: pointer.Int64Ptr(15), }) - return req.Stream(ctx) } func getBrowserPorts() []apiv1.ContainerPort { @@ -542,6 +687,24 @@ func getVolumes(volumes []apiv1.Volume) []apiv1.Volume { return v } +func getCapabilities(caps []apiv1.Capability) *apiv1.Capabilities { + if len(caps) > 0 { + return &apiv1.Capabilities{Add: caps} + } + return nil +} + +func getSecurityContext(runAsOptions RunAsOptions) *apiv1.PodSecurityContext { + secContext := &apiv1.PodSecurityContext{} + if runAsOptions.RunAsUser != nil { + secContext.RunAsUser = runAsOptions.RunAsUser + } + if runAsOptions.RunAsGroup != nil { + secContext.RunAsGroup = runAsOptions.RunAsGroup + } + return secContext +} + func waitForService(u url.URL, t time.Duration) error { up := make(chan struct{}) done := make(chan struct{}) diff --git a/platform/kubernetes_test.go b/platform/kubernetes_test.go index c5a9bbd..d34d7da 100644 --- a/platform/kubernetes_test.go +++ b/platform/kubernetes_test.go @@ -181,9 +181,13 @@ func TestErrorsOnServiceCreate(t *testing.T) { client := &Client{ ns: test.ns, clientset: mock, + service: &service{ + ns: test.ns, + clientset: mock, + }, } - _, err := client.Create(test.layout) + _, err := client.Service().Create(test.layout) assert.Equal(t, test.err.Error(), err.Error()) } @@ -224,6 +228,10 @@ func TestPodDelete(t *testing.T) { client := &Client{ ns: test.ns, clientset: mock, + service: &service{ + ns: test.ns, + clientset: mock, + }, } ctx := context.Background() @@ -248,7 +256,7 @@ func TestPodDelete(t *testing.T) { t.Fatalf("failed to create fake pod: %v", err) } - err = client.Delete(test.deletePod) + err = client.Service().Delete(test.deletePod) if err != nil { assert.Equal(t, test.err.Error(), err.Error()) @@ -282,7 +290,7 @@ func TestListPods(t *testing.T) { podNames: []string{"chrome-85-0-de44c3c4-1a35-412b-b526-f5da802144911", "chrome-85-0-de44c3c4-1a35-412b-b526-f5da802144912", "chrome-85-0-de44c3c4-1a35-412b-b526-f5da802144913"}, podPhase: []apiv1.PodPhase{apiv1.PodRunning, apiv1.PodPending, apiv1.PodFailed}, podStatus: []ServiceStatus{Running, Pending, Unknown}, - labels: map[string]string{"type": "browser"}, + labels: map[string]string{"selenosis.app.type": "browser"}, browserImage: "selenoid/vnc:chrome_85.0", proxyImage: "alcounit/seleniferous:latest", }, @@ -329,12 +337,12 @@ func TestListPods(t *testing.T) { } } - pods, err := client.List() + state, err := client.State() if err != nil { t.Fatalf("Failed to list pods %v", err) } - for i, pod := range pods { + for i, pod := range state.Services { assert.Equal(t, pod.SessionID, test.podNames[i]) u := &url.URL{ @@ -347,4 +355,4 @@ func TestListPods(t *testing.T) { assert.Equal(t, pod.Status, test.podStatus[i]) } } -} \ No newline at end of file +} diff --git a/platform/platform.go b/platform/platform.go index 082e66f..60c0a83 100644 --- a/platform/platform.go +++ b/platform/platform.go @@ -22,22 +22,28 @@ type Spec struct { HostAliases []apiv1.HostAlias `yaml:"hostAliases,omitempty" json:"hostAliases,omitempty"` EnvVars []apiv1.EnvVar `yaml:"env,omitempty" json:"env,omitempty"` NodeSelector map[string]string `yaml:"nodeSelector,omitempty" json:"nodeSelector,omitempty"` - Affinity apiv1.Affinity `yaml:"affinity,omitempty" json:"affinity,omitempty"` - DNSConfig apiv1.PodDNSConfig `yaml:"dnsConfig,omitempty" json:"dnsConfig,omitempty"` + Affinity *apiv1.Affinity `yaml:"affinity,omitempty" json:"affinity,omitempty"` + DNSConfig *apiv1.PodDNSConfig `yaml:"dnsConfig,omitempty" json:"dnsConfig,omitempty"` Tolerations []apiv1.Toleration `yaml:"tolerations,omitempty" json:"tolerations,omitempty"` VolumeMounts []apiv1.VolumeMount `yaml:"volumeMounts,omitempty" json:"volumeMounts,omitempty"` } +type RunAsOptions struct { + RunAsUser *int64 `yaml:"uid,omitempty" json:"uid,omitempty"` + RunAsGroup *int64 `yaml:"gid,omitempty" json:"gid,omitempty"` +} //BrowserSpec describes settings for Service type BrowserSpec struct { - BrowserName string `yaml:"-" json:"-"` - BrowserVersion string `yaml:"-" json:"-"` - Image string `yaml:"image" json:"image"` - Path string `yaml:"path" json:"path"` - Privileged bool `yaml:"privileged" json:"privileged"` - Meta Meta `yaml:"meta" json:"meta"` - Spec Spec `yaml:"spec" json:"spec"` - Volumes []apiv1.Volume `yaml:"volumes,omitempty" json:"volumes,omitempty"` + BrowserName string `yaml:"-" json:"-"` + BrowserVersion string `yaml:"-" json:"-"` + Image string `yaml:"image" json:"image"` + Path string `yaml:"path" json:"path"` + Privileged *bool `yaml:"privileged" json:"privileged"` + Meta Meta `yaml:"meta" json:"meta"` + Spec Spec `yaml:"spec" json:"spec"` + Volumes []apiv1.Volume `yaml:"volumes,omitempty" json:"volumes,omitempty"` + Capabilities []apiv1.Capability `yaml:"kernelCaps,omitempty" json:"kernelCaps,omitempty"` + RunAs RunAsOptions `yaml:"runAs,omitempty" json:"runAs,omitempty"` } //ServiceSpec describes data requred for creating service @@ -59,13 +65,31 @@ type Service struct { Uptime string `json:"uptime"` } +type Quota struct { + Name string `json:"name"` + CurrentMaxLimit int64 `json:"totalLimit"` +} + +type PlatformState struct { + Services []*Service + Workers []*Worker +} + +type Worker struct { + Name string `json:"name"` + Labels map[string]string `json:"labels"` + Status ServiceStatus `json:"-"` + Started time.Time `json:"started"` + Uptime string `json:"uptime"` +} + //ServiceStatus ... type ServiceStatus string //Event ... type Event struct { - Type EventType - Service *Service + Type EventType + PlatformObject interface{} } //EventType ... @@ -83,9 +107,20 @@ const ( //Platform ... type Platform interface { + Service() ServiceInterface + Quota() QuotaInterface + State() (PlatformState, error) + Watch() <-chan Event +} + +type ServiceInterface interface { Create(*ServiceSpec) (*Service, error) Delete(string) error - List() ([]*Service, error) - Watch() <-chan Event Logs(context.Context, string) (io.ReadCloser, error) } + +type QuotaInterface interface { + Create(int64) (*Quota, error) + Get() (*Quota, error) + Update(int64) (*Quota, error) +} diff --git a/selenosis.go b/selenosis.go index 04ec468..f463bdd 100644 --- a/selenosis.go +++ b/selenosis.go @@ -42,27 +42,113 @@ func New(logger *log.Logger, client platform.Platform, browsers *config.Browsers storage := storage.New() - services, err := client.List() + state, err := client.State() if err != nil { - logger.Errorf("failed to get list of active pods: %v", err) + logger.Errorf("failed to get cluster state: %v", err) } - for _, service := range services { - storage.Put(service.SessionID, service) + for _, service := range state.Services { + storage.Sessions().Put(service.SessionID, service) } + for _, worker := range state.Workers { + storage.Workers().Put(worker.Name, worker) + } + + limit := cfg.SessionLimit + currentTotal := func() int64 { + return int64(storage.Workers().Len() + limit) + } + var quota *platform.Quota + if quota, err = client.Quota().Get(); err != nil { + quota, err = client.Quota().Create(currentTotal()) + if err != nil { + logger.Errorf("failed to create quota resource: %v", err) + } + } + + if quota.CurrentMaxLimit != int64(currentTotal()) { + quota, err = client.Quota().Update(currentTotal()) + if err != nil { + logger.Warnf("failed to update quota resource %v:", err) + } + } + + storage.Quota().Put(quota) + + logger.Infof("current cluster state: sessions - %d, workers - %d, session limit - %d", storage.Sessions().Len(), storage.Workers().Len(), limit) + ch := client.Watch() go func() { for { select { case event := <-ch: - switch event.Type { - case platform.Added: - storage.Put(event.Service.SessionID, event.Service) - case platform.Updated: - storage.Put(event.Service.SessionID, event.Service) - case platform.Deleted: - storage.Delete(event.Service.SessionID) + switch event.PlatformObject.(type) { + case *platform.Service: + service := event.PlatformObject.(*platform.Service) + switch event.Type { + case platform.Added: + storage.Sessions().Put(service.SessionID, service) + case platform.Updated: + storage.Sessions().Put(service.SessionID, service) + case platform.Deleted: + storage.Sessions().Delete(service.SessionID) + } + + case *platform.Worker: + worker := event.PlatformObject.(*platform.Worker) + switch event.Type { + case platform.Added: + storage.Workers().Put(worker.Name, worker) + result, err := client.Quota().Update(currentTotal()) + if err != nil { + logger.Warnf("failed to update resource quota: %v", err) + break + } + storage.Quota().Put(result) + logger.Infof("selenosis worker: %s added, current namespace quota limit: %d", worker.Name, storage.Quota().Get().CurrentMaxLimit) + case platform.Deleted: + storage.Workers().Delete(worker.Name) + result, err := client.Quota().Update(currentTotal()) + if err != nil { + logger.Warnf("failed to update resource quota: %v", err) + break + } + storage.Quota().Put(result) + logger.Infof("selenosis worker: %s removed, current namespace quota limit: %d", worker.Name, storage.Quota().Get().CurrentMaxLimit) + } + + case *platform.Quota: + quota := event.PlatformObject.(*platform.Quota) + switch event.Type { + case platform.Added: + if quota.CurrentMaxLimit != currentTotal() { + quota, err = client.Quota().Update(currentTotal()) + if err != nil { + logger.Warnf("failed to update quota resource %v:", err) + break + } + storage.Quota().Put(quota) + } + case platform.Updated: + if quota.CurrentMaxLimit != currentTotal() { + quota, err = client.Quota().Update(currentTotal()) + if err != nil { + logger.Warnf("failed to update quota resource %v:", err) + break + } + storage.Quota().Put(quota) + } + case platform.Deleted: + quota, err = client.Quota().Create(currentTotal()) + if err != nil { + logger.Warnf("failed to update quota resource %v:", err) + break + } + storage.Quota().Put(quota) + } + default: + break } default: break diff --git a/storage/storage.go b/storage/storage.go index 8915ab6..bcc6fbf 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -7,54 +7,126 @@ import ( "github.com/alcounit/selenosis/tools" ) -//Storage ... -type Storage struct { - sessions map[string]*platform.Service - sync.RWMutex -} - -//New ... -func New() *Storage { - return &Storage{ - sessions: make(map[string]*platform.Service), - } +type sessions struct { + m map[string]*platform.Service } //Put ... -func (s *Storage) Put(sessionID string, service *platform.Service) { - s.Lock() - defer s.Unlock() +func (s *sessions) Put(sessionID string, service *platform.Service) { if sessionID != "" { - s.sessions[sessionID] = service + s.m[sessionID] = service } } //Delete ... -func (s *Storage) Delete(sessionID string) { - s.Lock() - defer s.Unlock() - delete(s.sessions, sessionID) +func (s *sessions) Delete(sessionID string) { + delete(s.m, sessionID) } //List ... -func (s *Storage) List() []platform.Service { - s.Lock() - defer s.Unlock() +func (s *sessions) List() []platform.Service { var l []platform.Service + for _, p := range s.m { + c := *p + c.Uptime = tools.TimeElapsed(c.Started) + l = append(l, c) + } + + return l + +} + +//Len ... +func (s *sessions) Len() int { + return len(s.m) +} + +type workers struct { + m map[string]*platform.Worker +} + +//Put ... +func (w *workers) Put(name string, worker *platform.Worker) { + if name != "" { + w.m[name] = worker + } +} + +//Delete ... +func (w *workers) Delete(name string) { + delete(w.m, name) +} - for _, p := range s.sessions { +//List ... +func (w *workers) List() []platform.Worker { + var l []platform.Worker + for _, p := range w.m { c := *p c.Uptime = tools.TimeElapsed(c.Started) l = append(l, c) } + return l } //Len ... -func (s *Storage) Len() int { +func (s *workers) Len() int { + return len(s.m) +} + +type quota struct { + w *workers + q *platform.Quota +} + +//Put ... +func (q *quota) Put(quota *platform.Quota) { + q.q = quota +} + +//Put ... +func (q *quota) Get() *platform.Quota { + return q.q +} + +//Storage ... +type Storage struct { + sessions *sessions + workers *workers + quota *quota + sync.RWMutex +} + +//New ... +func New() *Storage { + sessions := &sessions{m: make(map[string]*platform.Service)} + workers := &workers{m: make(map[string]*platform.Worker)} + quota := "a{w: workers} + return &Storage{ + sessions: sessions, + workers: workers, + quota: quota, + } +} + +//Sessions ... +func (s *Storage) Sessions() *sessions { + s.Lock() + defer s.Unlock() + return s.sessions +} + +//Workers ... +func (s *Storage) Workers() *workers { s.Lock() defer s.Unlock() + return s.workers +} - return len(s.sessions) +//Quota ... +func (s *Storage) Quota() *quota { + s.Lock() + defer s.Unlock() + return s.quota } diff --git a/storage/storage_test.go b/storage/storage_test.go index 9b5b002..0685b58 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -11,7 +11,7 @@ func TestNew(t *testing.T) { tests := map[string]struct { len int }{ - "Verify session storage is empty on creation": { + "Verify storage is empty on creation": { len: 0, }, } @@ -21,7 +21,8 @@ func TestNew(t *testing.T) { strg := New() - assert.Equal(t, strg.Len(), test.len) + assert.Equal(t, strg.Sessions().Len(), test.len) + assert.Equal(t, strg.Workers().Len(), test.len) } } @@ -49,9 +50,9 @@ func TestPut(t *testing.T) { for name, test := range tests { t.Logf("TC: %s", name) - test.strg.Put(test.session, test.service) + test.strg.Sessions().Put(test.session, test.service) - assert.Equal(t, test.strg.Len(), test.len) + assert.Equal(t, test.strg.Sessions().Len(), test.len) } } @@ -85,13 +86,13 @@ func TestDelete(t *testing.T) { for name, test := range tests { t.Logf("TC: %s", name) - test.strg.Put(test.sessionToAdd, test.service) + test.strg.Sessions().Put(test.sessionToAdd, test.service) - assert.Equal(t, test.strg.Len(), test.lenOnAdd) + assert.Equal(t, test.strg.Sessions().Len(), test.lenOnAdd) - test.strg.Delete(test.sessionToDelete) + test.strg.Sessions().Delete(test.sessionToDelete) - assert.Equal(t, test.strg.Len(), test.lenOnDelete) + assert.Equal(t, test.strg.Sessions().Len(), test.lenOnDelete) } } @@ -115,12 +116,12 @@ func TestList(t *testing.T) { for name, test := range tests { t.Logf("TC: %s", name) - test.strg.Put(test.session, test.service) + test.strg.Sessions().Put(test.session, test.service) - for _, svc := range test.strg.List() { + for _, svc := range test.strg.Sessions().List() { assert.Equal(t, svc.SessionID, test.service.SessionID) } - assert.Equal(t, test.strg.Len(), test.len) + assert.Equal(t, test.strg.Sessions().Len(), test.len) } }