Skip to content

Commit

Permalink
Customize cleanup for multi node test
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Nov 17, 2022
1 parent d867cd7 commit 0504d7b
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity
import org.junit.AfterClass
import org.junit.Before
import org.junit.rules.DisableOnDebug
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction
import org.opensearch.client.Request
import org.opensearch.client.Response
import org.opensearch.client.RestClient
Expand All @@ -25,11 +26,15 @@ import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
import org.opensearch.rest.RestStatus
import java.io.IOException
import java.nio.file.Files
import java.util.*
import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL
import kotlin.collections.ArrayList
import kotlin.collections.HashSet

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

Expand Down Expand Up @@ -63,7 +68,6 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {

protected val isDebuggingTest = DisableOnDebug(null).isDebugging
protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean()
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1

protected val isLocalTest = clusterName() == "integTest"
private fun clusterName(): String {
Expand Down Expand Up @@ -160,21 +164,15 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}

override fun preserveIndicesUponCompletion(): Boolean = true

companion object {

@JvmStatic
protected val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1
protected val defaultKeepIndexSet = setOf(".opendistro_security")
/**
* This clean up function can be use in @After or @AfterClass in the base test file
* of your feature test suite
* We override preserveIndicesUponCompletion to true and use this function to clean up indices
* Meant to be used in @After or @AfterClass of your feature test suite
*/
fun wipeAllIndices(client: RestClient = adminClient(), keepIndex: kotlin.collections.Set<String> = defaultKeepIndexSet) {
waitFor {
waitForRunningTasks(client)
waitForPendingTasks(client)
waitForThreadPools(client)
}
// Delete all data stream indices
try {
client.performRequest(Request("DELETE", "_data_stream/*"))
} catch (e: ResponseException) {
Expand All @@ -186,9 +184,7 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}

// Delete all indices
val response = client.performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
Expand All @@ -208,6 +204,72 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
}
}
}

waitFor {
if (!isMultiNode) {
waitForRunningTasks(client)
waitForPendingTasks(client)
waitForThreadPools(client)
} else {
// Multi node test is not suitable to waitFor
// We have seen long-running write task that fails the waitFor
// probably because of cluster manager - data node task not in sync
// So instead we just sleep 1s after wiping indices
Thread.sleep(1_000)
}
}
}

@JvmStatic
@Throws(IOException::class)
protected fun waitForRunningTasks(client: RestClient) {
val runningTasks: MutableSet<String> = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed")))
if (runningTasks.isEmpty()) {
return
}
val stillRunning = ArrayList<String>(runningTasks)
fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.")
}

@Suppress("UNCHECKED_CAST")
@Throws(IOException::class)
private fun runningTasks(response: Response): MutableSet<String> {
val runningTasks: MutableSet<String> = HashSet()
val nodes = entityAsMap(response)["nodes"] as Map<String, Any>?
for ((_, value) in nodes!!) {
val nodeInfo = value as Map<String, Any>
val nodeTasks = nodeInfo["tasks"] as Map<String, Any>?
for ((_, value1) in nodeTasks!!) {
val task = value1 as Map<String, Any>
// Ignore the task list API - it doesn't count against us
if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue
// runningTasks.add(task["action"].toString() + " | " + task["description"].toString())
runningTasks.add(task.toString())
}
}
return runningTasks
}

@JvmStatic
protected fun waitForThreadPools(client: RestClient) {
val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val active = (jsonObject["active"] as String).toInt()
val queue = (jsonObject["queue"] as String).toInt()
val name = jsonObject["name"]
val trueActive = if (name == "management") active - 1 else active
if (trueActive > 0 || queue > 0) {
fail("Still active threadpools in cluster: $jsonObject")
}
}
}
}

internal interface IProxy {
Expand Down
61 changes: 0 additions & 61 deletions src/test/kotlin/org/opensearch/indexmanagement/ODFERestTestCase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,9 @@
package org.opensearch.indexmanagement

import org.apache.http.HttpHost
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction
import org.opensearch.client.Request
import org.opensearch.client.Response
import org.opensearch.client.RestClient
import org.opensearch.common.io.PathUtils
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.DeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_ENABLED
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_KEYPASSWORD
Expand All @@ -23,7 +17,6 @@ import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_PEMCE
import org.opensearch.commons.rest.SecureRestClientBuilder
import org.opensearch.test.rest.OpenSearchRestTestCase
import java.io.IOException
import java.util.Date

abstract class ODFERestTestCase : OpenSearchRestTestCase() {

Expand All @@ -33,60 +26,6 @@ abstract class ODFERestTestCase : OpenSearchRestTestCase() {

override fun getProtocol(): String = if (isHttps()) "https" else "http"

companion object {
@JvmStatic
@Throws(IOException::class)
protected fun waitForRunningTasks(client: RestClient) {
val runningTasks: MutableSet<String> = runningTasks(client.performRequest(Request("GET", "/_tasks?detailed")))
if (runningTasks.isEmpty()) {
return
}
val stillRunning = ArrayList<String>(runningTasks)
fail("${Date()}: There are still tasks running after this test that might break subsequent tests: \n${stillRunning.joinToString("\n")}.")
}

@Suppress("UNCHECKED_CAST")
@Throws(IOException::class)
private fun runningTasks(response: Response): MutableSet<String> {
val runningTasks: MutableSet<String> = HashSet()
val nodes = entityAsMap(response)["nodes"] as Map<String, Any>?
for ((_, value) in nodes!!) {
val nodeInfo = value as Map<String, Any>
val nodeTasks = nodeInfo["tasks"] as Map<String, Any>?
for ((_, value1) in nodeTasks!!) {
val task = value1 as Map<String, Any>
// Ignore the task list API - it doesn't count against us
if (task["action"] == ListTasksAction.NAME || task["action"] == ListTasksAction.NAME + "[n]") continue
// runningTasks.add(task["action"].toString() + " | " + task["description"].toString())
runningTasks.add(task.toString())
}
}
return runningTasks
}

@JvmStatic
protected fun waitForThreadPools(client: RestClient) {
val response = client.performRequest(Request("GET", "/_cat/thread_pool?format=json"))

val xContentType = XContentType.fromMediaType(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val active = (jsonObject["active"] as String).toInt()
val queue = (jsonObject["queue"] as String).toInt()
val name = jsonObject["name"]
val trueActive = if (name == "management") active - 1 else active
if (trueActive > 0 || queue > 0) {
fail("Still active threadpools in cluster: $jsonObject")
}
}
}
}
}

/**
* Returns the REST client settings used for super-admin actions like cleaning up after the test has completed.
*/
Expand Down

0 comments on commit 0504d7b

Please sign in to comment.