Skip to content

Commit

Permalink
Implement support for script execution kill and timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo Dias <[email protected]>
  • Loading branch information
rjd15372 committed Dec 30, 2024
1 parent 12202cd commit 8ae2eb8
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 12 deletions.
16 changes: 16 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -13151,6 +13151,21 @@ int VM_UnregisterScriptingEngine(ValkeyModuleCtx *ctx, const char *engine_name)
return VALKEYMODULE_OK;
}

/* Returns the state of the current function being executed by the scripting
* engine.
*
* `server_ctx` is the server runtime context.
*
* It will return VMSE_STATE_KILLED if the function was already killed either by
* a `SCRIPT KILL`, or `FUNCTION KILL`.
*/
ValkeyModuleScriptingEngineExecutionState VM_GetFunctionExecutionState(
ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx) {
int ret = scriptInterrupt(server_ctx);
serverAssert(ret == SCRIPT_CONTINUE || ret == SCRIPT_KILL);
return ret == SCRIPT_CONTINUE ? VMSE_STATE_EXECUTING : VMSE_STATE_KILLED;
}

/* MODULE command.
*
* MODULE LIST
Expand Down Expand Up @@ -14023,4 +14038,5 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(RdbSave);
REGISTER_API(RegisterScriptingEngine);
REGISTER_API(UnregisterScriptingEngine);
REGISTER_API(GetFunctionExecutionState);
}
8 changes: 8 additions & 0 deletions src/valkeymodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,11 @@ typedef enum ValkeyModuleScriptingEngineSubsystemType {
VMSE_ALL
} ValkeyModuleScriptingEngineSubsystemType;

typedef enum ValkeyModuleScriptingEngineExecutionState {
VMSE_STATE_EXECUTING,
VMSE_STATE_KILLED,
} ValkeyModuleScriptingEngineExecutionState;

typedef struct ValkeyModuleScriptingEngineCallableLazyEvalReset {
void *context;

Expand Down Expand Up @@ -1868,6 +1873,8 @@ VALKEYMODULE_API int (*ValkeyModule_RegisterScriptingEngine)(ValkeyModuleCtx *mo
VALKEYMODULE_API int (*ValkeyModule_UnregisterScriptingEngine)(ValkeyModuleCtx *module_ctx,
const char *engine_name) VALKEYMODULE_ATTR;

VALKEYMODULE_API ValkeyModuleScriptingEngineExecutionState (*ValkeyModule_GetFunctionExecutionState)(ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx) VALKEYMODULE_ATTR;

#define ValkeyModule_IsAOFClient(id) ((id) == UINT64_MAX)

/* This is included inline inside each Valkey module. */
Expand Down Expand Up @@ -2237,6 +2244,7 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in
VALKEYMODULE_GET_API(RdbSave);
VALKEYMODULE_GET_API(RegisterScriptingEngine);
VALKEYMODULE_GET_API(UnregisterScriptingEngine);
VALKEYMODULE_GET_API(GetFunctionExecutionState);

if (ValkeyModule_IsModuleNameBusy && ValkeyModule_IsModuleNameBusy(name)) return VALKEYMODULE_ERR;
ValkeyModule_SetModuleAttribs(ctx, name, ver, apiver);
Expand Down
84 changes: 72 additions & 12 deletions tests/modules/helloscripting.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ctype.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>

/*
* This module implements a very simple stack based scripting language.
Expand All @@ -28,6 +29,15 @@
* CONSTI 432 # pushes the value 432 to the top of the stack
* RETURN # returns the current value on the top of the stack and marks
* # the end of the function declaration.
*
* FUNCTION sleep # declaration of function 'sleep'
* ARGS 0 # pushes the value in the first argument to the top of the
* # stack
* SLEEP # Pops the current value in the stack and sleeps for `value`
* # seconds
* CONSTI 0 # pushes the value 0 to the top of the stack
* RETURN # returns the current value on the top of the stack and marks
* # the end of the function declaration.
* ```
*/

Expand All @@ -38,6 +48,7 @@ typedef enum HelloInstKind {
FUNCTION = 0,
CONSTI,
ARGS,
SLEEP,
RETURN,
_NUM_INSTRUCTIONS, // Not a real instruction.
} HelloInstKind;
Expand All @@ -49,6 +60,7 @@ const char *HelloInstKindStr[] = {
"FUNCTION",
"CONSTI",
"ARGS",
"SLEEP",
"RETURN",
};

Expand Down Expand Up @@ -183,6 +195,10 @@ static int helloLangParseCode(const char *code,
ValkeyModule_Assert(currentFunc != NULL);
helloLangParseArgs(currentFunc);
break;
case SLEEP:
ValkeyModule_Assert(currentFunc != NULL);
currentFunc->num_instructions++;
break;
case RETURN:
ValkeyModule_Assert(currentFunc != NULL);
currentFunc->num_instructions++;
Expand All @@ -202,13 +218,40 @@ static int helloLangParseCode(const char *code,
return 0;
}

static ValkeyModuleScriptingEngineExecutionState executeSleepInst(ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx,
uint32_t seconds) {
uint32_t elapsed_milliseconds = 0;
ValkeyModuleScriptingEngineExecutionState state = VMSE_STATE_EXECUTING;
while(1) {
state = ValkeyModule_GetFunctionExecutionState(server_ctx);
if (state != VMSE_STATE_EXECUTING) {
break;
}

if (elapsed_milliseconds >= (seconds * 1000)) {
break;
}

usleep(1000);
elapsed_milliseconds++;
}

return state;
}

/*
* Executes an HELLO function.
*/
static uint32_t executeHelloLangFunction(HelloFunc *func,
ValkeyModuleString **args, int nargs) {
static ValkeyModuleScriptingEngineExecutionState executeHelloLangFunction(ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx,
HelloFunc *func,
ValkeyModuleString **args,
int nargs,
uint32_t *result) {
ValkeyModule_Assert(result != NULL);
uint32_t stack[64];
uint32_t val = 0;
int sp = 0;
ValkeyModuleScriptingEngineExecutionState state = VMSE_STATE_EXECUTING;

for (uint32_t pc = 0; pc < func->num_instructions; pc++) {
HelloInst instr = func->instructions[pc];
Expand All @@ -224,20 +267,26 @@ static uint32_t executeHelloLangFunction(HelloFunc *func,
uint32_t arg = str2int(argStr);
stack[sp++] = arg;
break;
}
}
case SLEEP: {
val = stack[--sp];
state = executeSleepInst(server_ctx, val);
break;
}
case RETURN: {
uint32_t val = stack[--sp];
val = stack[--sp];
ValkeyModule_Assert(sp == 0);
return val;
}
*result = val;
return state;
}
case FUNCTION:
default:
case _NUM_INSTRUCTIONS:
ValkeyModule_Assert(0);
}
}

ValkeyModule_Assert(0);
return 0;
return state;
}

static ValkeyModuleScriptingEngineMemoryInfo engineGetMemoryInfo(ValkeyModuleCtx *module_ctx,
Expand Down Expand Up @@ -340,19 +389,30 @@ static ValkeyModuleScriptingEngineCompiledFunction **createHelloLangEngine(Valke
static void
callHelloLangFunction(ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineFunctionCtx *func_ctx,
ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx,
ValkeyModuleScriptingEngineCompiledFunction *compiled_function,
ValkeyModuleScriptingEngineSubsystemType type,
ValkeyModuleString **keys, size_t nkeys,
ValkeyModuleString **args, size_t nargs) {
VALKEYMODULE_NOT_USED(engine_ctx);
VALKEYMODULE_NOT_USED(func_ctx);
VALKEYMODULE_NOT_USED(type);
VALKEYMODULE_NOT_USED(keys);
VALKEYMODULE_NOT_USED(nkeys);

ValkeyModule_Assert(type == VMSE_EVAL || type == VMSE_FUNCTION);

HelloFunc *func = (HelloFunc *)compiled_function->function;
uint32_t result = executeHelloLangFunction(func, args, nargs);
uint32_t result;
ValkeyModuleScriptingEngineExecutionState state = executeHelloLangFunction(server_ctx, func, args, nargs, &result);
ValkeyModule_Assert(state == VMSE_STATE_KILLED || state == VMSE_STATE_EXECUTING);

if (state == VMSE_STATE_KILLED) {
if (type == VMSE_EVAL) {
ValkeyModule_ReplyWithError(module_ctx, "ERR Script killed by user with SCRIPT KILL.");
}
if (type == VMSE_FUNCTION) {
ValkeyModule_ReplyWithError(module_ctx, "ERR Script killed by user with FUNCTION KILL");
}
}

ValkeyModule_ReplyWithLongLong(module_ctx, result);
}
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/moduleapi/scriptingengine.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@ start_server {tags {"modules"}} {
assert_equal $result 432
}

test {Test function kill} {
set rd [valkey_deferring_client]
r config set busy-reply-threshold 10
r function load REPLACE "#!hello name=mylib\nFUNCTION sleep\nARGS 0\nSLEEP\nARGS 0\nRETURN"
$rd fcall sleep 0 100
after 1000
catch {r ping} e
assert_match {BUSY*} $e
assert_match {running_script {name sleep command {fcall sleep 0 100} duration_ms *} engines {*}} [r FUNCTION STATS]
r function kill
after 1000 ;
assert_equal [r ping] "PONG"
assert_error {ERR Script killed by user with FUNCTION KILL*} {$rd read}
$rd close
}

test {Unload scripting engine module} {
set result [r module unload helloengine]
assert_equal $result "OK"
Expand Down

0 comments on commit 8ae2eb8

Please sign in to comment.