diff --git a/ray-operator/test/e2e/create_detached_actor.py b/ray-operator/test/e2e/create_detached_actor.py index 8f69b6ca39..fc4743ba07 100644 --- a/ray-operator/test/e2e/create_detached_actor.py +++ b/ray-operator/test/e2e/create_detached_actor.py @@ -1,11 +1,17 @@ import ray import sys +import argparse +parser = argparse.ArgumentParser() +parser.add_argument('name') +parser.add_argument('--num-cpus', type=float, default=1) +parser.add_argument('--num-gpus', type=float, default=0) +args = parser.parse_args() -@ray.remote(num_cpus=1) +@ray.remote(num_cpus=args.num_cpus, num_gpus=args.num_gpus) class Actor: pass ray.init(namespace="default_namespace") -Actor.options(name=sys.argv[1], lifetime="detached").remote() +Actor.options(name=args.name, lifetime="detached").remote() diff --git a/ray-operator/test/e2e/raycluster_autoscaler_test.go b/ray-operator/test/e2e/raycluster_autoscaler_test.go index af9aa9330f..f89876f24e 100644 --- a/ray-operator/test/e2e/raycluster_autoscaler_test.go +++ b/ray-operator/test/e2e/raycluster_autoscaler_test.go @@ -75,3 +75,61 @@ func TestRayClusterAutoscaler(t *testing.T) { Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0)))) }) } + +func TestRayClusterAutoscalerWithFakeGPU(t *testing.T) { + test := With(t) + + // Create a namespace + namespace := test.NewTestNamespace() + test.StreamKubeRayOperatorLogs() + + // Scripts for creating and terminating detached actors to trigger autoscaling + scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "create_detached_actor.py", "terminate_detached_actor.py")) + scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name) + + test.T().Run("Create a RayCluster with autoscaling enabled", func(_ *testing.T) { + rayClusterSpecAC := rayv1ac.RayClusterSpec(). + WithEnableInTreeAutoscaling(true). + WithRayVersion(GetRayVersion()). + WithHeadGroupSpec(rayv1ac.HeadGroupSpec(). + WithRayStartParams(map[string]string{"num-cpus": "0"}). + WithTemplate(headPodTemplateApplyConfiguration())). + WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec(). + WithReplicas(0). + WithMinReplicas(0). + WithMaxReplicas(3). + WithGroupName("gpu-group"). + WithRayStartParams(map[string]string{"num-cpus": "1", "num-gpus": "1"}). + WithTemplate(workerPodTemplateApplyConfiguration())) + rayClusterAC := rayv1ac.RayCluster("ray-cluster", namespace.Name). + WithSpec(apply(rayClusterSpecAC, mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](scripts, "/home/ray/test_scripts"))) + + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name) + + // Wait for RayCluster to become ready and verify the number of available worker replicas. + test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + rayCluster = GetRayCluster(test, rayCluster.Namespace, rayCluster.Name) + test.Expect(rayCluster.Status.DesiredWorkerReplicas).To(Equal(int32(0))) + + headPod := GetHeadPod(test, rayCluster) + test.T().Logf("Found head pod %s/%s", headPod.Namespace, headPod.Name) + + // Create a detached gpu actor, and a worker in the "gpu-group" should be created. + ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/create_detached_actor.py", "gpu_actor", "--num-gpus=1"}) + test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(1)))) + // We don't use real GPU resources of Kubernetes here, therefore we can't test the RayClusterDesiredGPU. + // We test the Pods count of the "gpu-group" instead. + test.Expect(GetGroupPods(test, rayCluster, "gpu-group")).To(HaveLen(1)) + + // Terminate the gpu detached actor, and the worker should be deleted. + ExecPodCmd(test, headPod, common.RayHeadContainer, []string{"python", "/home/ray/test_scripts/terminate_detached_actor.py", "gpu_actor"}) + test.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterDesiredWorkerReplicas, Equal(int32(0)))) + }) +} diff --git a/ray-operator/test/support/ray.go b/ray-operator/test/support/ray.go index 743f389ac3..e24f99a2c7 100644 --- a/ray-operator/test/support/ray.go +++ b/ray-operator/test/support/ray.go @@ -79,6 +79,16 @@ func GetHeadPod(t Test, rayCluster *rayv1.RayCluster) *corev1.Pod { return &pods.Items[0] } +func GetGroupPods(t Test, rayCluster *rayv1.RayCluster, group string) []corev1.Pod { + t.T().Helper() + pods, err := t.Client().Core().CoreV1().Pods(rayCluster.Namespace).List( + t.Ctx(), + common.RayClusterGroupPodsAssociationOptions(rayCluster, group).ToMetaV1ListOptions(), + ) + t.Expect(err).NotTo(gomega.HaveOccurred()) + return pods.Items +} + func RayService(t Test, namespace, name string) func(g gomega.Gomega) *rayv1.RayService { return func(g gomega.Gomega) *rayv1.RayService { service, err := t.Client().Ray().RayV1().RayServices(namespace).Get(t.Ctx(), name, metav1.GetOptions{})