From 0504d7bd55bc2eb403614da89702940fd2396080 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 16 Nov 2022 17:43:49 -0800 Subject: [PATCH] Customize cleanup for multi node test Signed-off-by: bowenlan-amzn --- .../IndexManagementRestTestCase.kt | 88 ++++++++++++++++--- .../indexmanagement/ODFERestTestCase.kt | 61 ------------- 2 files changed, 75 insertions(+), 74 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 0919fb7db..eba7db734 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity import org.junit.AfterClass import org.junit.Before import org.junit.rules.DisableOnDebug +import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction import org.opensearch.client.Request import org.opensearch.client.Response import org.opensearch.client.RestClient @@ -25,11 +26,15 @@ import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.rest.RestStatus +import java.io.IOException import java.nio.file.Files +import java.util.* import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL +import kotlin.collections.ArrayList +import kotlin.collections.HashSet abstract class IndexManagementRestTestCase : ODFERestTestCase() { @@ -63,7 +68,6 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { protected val isDebuggingTest = DisableOnDebug(null).isDebugging protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean() - protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 protected val isLocalTest = clusterName() == "integTest" private fun clusterName(): String { @@ -160,21 +164,15 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { } override fun preserveIndicesUponCompletion(): Boolean = true - companion object { - + @JvmStatic + protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 protected val defaultKeepIndexSet = setOf(".opendistro_security") /** - * This clean up function can be use in @After or @AfterClass in the base test file - * of your feature test suite + * We override preserveIndicesUponCompletion to true and use this function to clean up indices + * Meant to be used in @After or @AfterClass of your feature test suite */ fun wipeAllIndices(client: RestClient = adminClient(), keepIndex: kotlin.collections.Set = defaultKeepIndexSet) { - waitFor { - waitForRunningTasks(client) - waitForPendingTasks(client) - waitForThreadPools(client) - } - // Delete all data stream indices try { client.performRequest(Request("DELETE", "_data_stream/*")) } catch (e: ResponseException) { @@ -186,9 +184,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { } } - // Delete all indices val response = client.performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all")) - val xContentType = XContentType.fromMediaType(response.entity.contentType.value) xContentType.xContent().createParser( NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, @@ -208,6 +204,72 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { } } } + + waitFor { + if (!isMultiNode) { + waitForRunningTasks(client) + waitForPendingTasks(client) + waitForThreadPools(client) + } else { + // Multi node test is not suitable to waitFor + // We have seen long-running write task that fails the waitFor + // probably because of cluster manager - data node task not in sync + // So instead we just sleep 1s after wiping indices + Thread.sleep(1_000) + } + } + } + + @JvmStatic + @Throws(IOException::class) + protected fun waitForRunningTasks(client: RestClient) { + val runningTasks: MutableSet = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed"))) + if (runningTasks.isEmpty()) { + return + } + val stillRunning = ArrayList(runningTasks) + fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.") + } + + @Suppress("UNCHECKED_CAST") + @Throws(IOException::class) + private fun runningTasks(response: Response): MutableSet { + val runningTasks: MutableSet = HashSet() + val nodes = entityAsMap(response)["nodes"] as Map? + for ((_, value) in nodes!!) { + val nodeInfo = value as Map + val nodeTasks = nodeInfo["tasks"] as Map? + for ((_, value1) in nodeTasks!!) { + val task = value1 as Map + // Ignore the task list API - it doesn't count against us + if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue + // runningTasks.add(task["action"].toString() + " | " + task["description"].toString()) + runningTasks.add(task.toString()) + } + } + return runningTasks + } + + @JvmStatic + protected fun waitForThreadPools(client: RestClient) { + val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json")) + + val xContentType = XContentType.fromMediaType(response.entity.contentType.value) + xContentType.xContent().createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + response.entity.content + ).use { parser -> + for (index in parser.list()) { + val jsonObject: Map<*, *> = index as java.util.HashMap<*, *> + val active = (jsonObject["active"] as String).toInt() + val queue = (jsonObject["queue"] as String).toInt() + val name = jsonObject["name"] + val trueActive = if (name == "management") active - 1 else active + if (trueActive > 0 || queue > 0) { + fail("Still active threadpools in cluster: $jsonObject") + } + } + } } internal interface IProxy { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt index 59b57b668..fa1a77a92 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt @@ -6,15 +6,9 @@ package org.opensearch.indexmanagement import org.apache.http.HttpHost -import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction -import org.opensearch.client.Request -import org.opensearch.client.Response import org.opensearch.client.RestClient import org.opensearch.common.io.PathUtils import org.opensearch.common.settings.Settings -import org.opensearch.common.xcontent.DeprecationHandler -import org.opensearch.common.xcontent.NamedXContentRegistry -import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_ENABLED import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_KEYPASSWORD @@ -23,7 +17,6 @@ import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_PEMCE import org.opensearch.commons.rest.SecureRestClientBuilder import org.opensearch.test.rest.OpenSearchRestTestCase import java.io.IOException -import java.util.Date abstract class ODFERestTestCase : OpenSearchRestTestCase() { @@ -33,60 +26,6 @@ abstract class ODFERestTestCase : OpenSearchRestTestCase() { override fun getProtocol(): String = if (isHttps()) "https" else "http" - companion object { - @JvmStatic - @Throws(IOException::class) - protected fun waitForRunningTasks(client: RestClient) { - val runningTasks: MutableSet = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed"))) - if (runningTasks.isEmpty()) { - return - } - val stillRunning = ArrayList(runningTasks) - fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.") - } - - @Suppress("UNCHECKED_CAST") - @Throws(IOException::class) - private fun runningTasks(response: Response): MutableSet { - val runningTasks: MutableSet = HashSet() - val nodes = entityAsMap(response)["nodes"] as Map? - for ((_, value) in nodes!!) { - val nodeInfo = value as Map - val nodeTasks = nodeInfo["tasks"] as Map? - for ((_, value1) in nodeTasks!!) { - val task = value1 as Map - // Ignore the task list API - it doesn't count against us - if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue - // runningTasks.add(task["action"].toString() + " | " + task["description"].toString()) - runningTasks.add(task.toString()) - } - } - return runningTasks - } - - @JvmStatic - protected fun waitForThreadPools(client: RestClient) { - val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json")) - - val xContentType = XContentType.fromMediaType(response.entity.contentType.value) - xContentType.xContent().createParser( - NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, - response.entity.content - ).use { parser -> - for (index in parser.list()) { - val jsonObject: Map<*, *> = index as java.util.HashMap<*, *> - val active = (jsonObject["active"] as String).toInt() - val queue = (jsonObject["queue"] as String).toInt() - val name = jsonObject["name"] - val trueActive = if (name == "management") active - 1 else active - if (trueActive > 0 || queue > 0) { - fail("Still active threadpools in cluster: $jsonObject") - } - } - } - } - } - /** * Returns the REST client settings used for super-admin actions like cleaning up after the test has completed. */