Skip to content
2 changes: 1 addition & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ long long emptyData(int dbnum, int flags, void(callback)(hashtable *)) {
if (with_functions) {
serverAssert(dbnum == -1);
/* TODO: fix this callback incompatibility. The arg is not used. */
functionsLibCtxClearCurrent(async, (void (*)(dict *))callback);
functionReset(async, (void (*)(dict *))callback);
}

/* Also fire the end event. Note that this event will fire almost
Expand Down
7 changes: 3 additions & 4 deletions src/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ void freeEvalScripts(dict *scripts, list *scripts_lru_list, list *engine_callbac
listIter *iter = listGetIterator(engine_callbacks, 0);
listNode *node = NULL;
while ((node = listNext(iter)) != NULL) {
callableLazyEvalReset *callback = listNodeValue(node);
callableLazyEnvReset *callback = listNodeValue(node);
if (callback != NULL) {
callback->engineLazyEvalResetCallback(callback->context);
callback->engineLazyEnvResetCallback(callback->context);
zfree(callback);
}
}
Expand All @@ -159,7 +159,7 @@ void freeEvalScripts(dict *scripts, list *scripts_lru_list, list *engine_callbac

static void resetEngineEvalEnvCallback(scriptingEngine *engine, void *context) {
int async = context != NULL;
callableLazyEvalReset *callback = scriptingEngineCallResetEvalEnvFunc(engine, async);
callableLazyEnvReset *callback = scriptingEngineCallResetEnvFunc(engine, VMSE_EVAL, async);

if (async) {
list *callbacks = context;
Expand All @@ -174,7 +174,6 @@ void evalRelease(int async) {
list *engine_callbacks = listCreate();
scriptingEngineManagerForEachEngine(resetEngineEvalEnvCallback, engine_callbacks);
freeEvalScriptsAsync(evalCtx.scripts, evalCtx.scripts_lru_list, engine_callbacks);

} else {
freeEvalScripts(evalCtx.scripts, evalCtx.scripts_lru_list, NULL);
scriptingEngineManagerForEachEngine(resetEngineEvalEnvCallback, NULL);
Expand Down
50 changes: 40 additions & 10 deletions src/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,32 +165,62 @@ void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *)) {
lib_ctx->cache_memory = 0;
}

void functionsLibCtxClearCurrent(int async, void(callback)(dict *)) {
static void resetEngineFunctionEnvCallback(scriptingEngine *engine, void *context) {
int async = context != NULL;
callableLazyEnvReset *callback = scriptingEngineCallResetEnvFunc(engine, VMSE_FUNCTION, async);

if (async) {
list *callbacks = context;
listAddNodeTail(callbacks, callback);
}
}

void functionsLibCtxReleaseCurrent(int async, void(callback)(dict *)) {
if (async) {
functionsLibCtx *old_l_ctx = curr_functions_lib_ctx;
curr_functions_lib_ctx = functionsLibCtxCreate();
freeFunctionsAsync(old_l_ctx);
list *engine_callbacks = listCreate();
scriptingEngineManagerForEachEngine(resetEngineFunctionEnvCallback, engine_callbacks);
freeFunctionsAsync(curr_functions_lib_ctx, engine_callbacks);
} else {
functionsLibCtxClear(curr_functions_lib_ctx, callback);
functionsLibCtxFree(curr_functions_lib_ctx, callback, NULL);
scriptingEngineManagerForEachEngine(resetEngineFunctionEnvCallback, NULL);
}
}

/* Free the given functions ctx */
static void functionsLibCtxFreeGeneric(functionsLibCtx *functions_lib_ctx, int async) {
if (async) {
freeFunctionsAsync(functions_lib_ctx);
freeFunctionsAsync(functions_lib_ctx, NULL);
} else {
functionsLibCtxFree(functions_lib_ctx);
functionsLibCtxFree(functions_lib_ctx, NULL, NULL);
}
}

void functionReset(int async, void(callback)(dict *)) {
functionsLibCtxReleaseCurrent(async, callback);
functionsInit();
}

/* Free the given functions ctx */
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx) {
functionsLibCtxClear(functions_lib_ctx, NULL);
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx, void(callback)(dict *), list *engine_callbacks) {
functionsLibCtxClear(functions_lib_ctx, callback);
dictRelease(functions_lib_ctx->functions);
dictRelease(functions_lib_ctx->libraries);
dictRelease(functions_lib_ctx->engines_stats);
zfree(functions_lib_ctx);

if (engine_callbacks) {
listIter *iter = listGetIterator(engine_callbacks, 0);
listNode *node = NULL;
while ((node = listNext(iter)) != NULL) {
callableLazyEnvReset *engine_callback = listNodeValue(node);
if (engine_callback != NULL) {
engine_callback->engineLazyEnvResetCallback(engine_callback->context);
zfree(engine_callback);
}
}
listReleaseIterator(iter);
listRelease(engine_callbacks);
}
}

/* Swap the current functions ctx with the given one.
Expand Down Expand Up @@ -824,7 +854,7 @@ void functionFlushCommand(client *c) {
return;
}

functionsLibCtxClearCurrent(async, NULL);
functionReset(async, NULL);

/* Indicate that the command changed the data so it will be replicated and
* counted as a data change (for persistence configuration) */
Expand Down
4 changes: 2 additions & 2 deletions src/functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ dict *functionsLibGet(void);
size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx);
functionsLibCtx *functionsLibCtxGetCurrent(void);
functionsLibCtx *functionsLibCtxCreate(void);
void functionsLibCtxClearCurrent(int async, void(callback)(dict *));
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx);
void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx, void(callback)(dict *), list *engine_callbacks);
void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *));
void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async);
void functionReset(int async, void(callback)(dict *));

void functionsRemoveLibFromEngine(scriptingEngine *engine);

Expand Down
9 changes: 5 additions & 4 deletions src/lazyfree.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ void lazyFreeEvalScripts(void *args[]) {
/* Release the functions ctx. */
void lazyFreeFunctionsCtx(void *args[]) {
functionsLibCtx *functions_lib_ctx = args[0];
list *engine_callbacks = args[1];
size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx);
functionsLibCtxFree(functions_lib_ctx);
functionsLibCtxFree(functions_lib_ctx, NULL, engine_callbacks);
atomic_fetch_sub_explicit(&lazyfree_objects, len, memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed);
}
Expand Down Expand Up @@ -236,13 +237,13 @@ void freeEvalScriptsAsync(dict *scripts, list *scripts_lru_list, list *engine_ca
}

/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) {
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, list *engine_callbacks) {
if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
atomic_fetch_add_explicit(&lazyfree_objects, functionsLibCtxFunctionsLen(functions_lib_ctx),
memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeFunctionsCtx, 1, functions_lib_ctx);
bioCreateLazyFreeJob(lazyFreeFunctionsCtx, 2, functions_lib_ctx, engine_callbacks);
} else {
functionsLibCtxFree(functions_lib_ctx);
functionsLibCtxFree(functions_lib_ctx, NULL, engine_callbacks);
}
}

Expand Down
57 changes: 30 additions & 27 deletions src/lua/engine_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,10 @@ static void luaEngineFunctionCall(ValkeyModuleCtx *module_ctx,
lua_pop(lua, 1); /* Remove the error handler. */
}

static void resetEvalContext(void *context) {
lua_State *eval_lua = context;
lua_gc(eval_lua, LUA_GCCOLLECT, 0);
lua_close(eval_lua);
static void resetLuaContext(void *context) {
lua_State *lua = context;
lua_gc(lua, LUA_GCCOLLECT, 0);
lua_close(lua);

#if !defined(USE_LIBC)
/* The lua interpreter may hold a lot of memory internally, and lua is
Expand All @@ -305,27 +305,30 @@ static void resetEvalContext(void *context) {
#endif
}

static callableLazyEvalReset *luaEngineResetEvalEnv(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
int async) {
static callableLazyEnvReset *luaEngineResetEvalEnv(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
subsystemType type,
int async) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);

luaEngineCtx *lua_engine_ctx = (luaEngineCtx *)engine_ctx;
serverAssert(lua_engine_ctx->eval_lua);
callableLazyEvalReset *callback = NULL;
serverAssert(type == VMSE_EVAL || type == VMSE_FUNCTION);
lua_State *lua = type == VMSE_EVAL ? lua_engine_ctx->eval_lua : lua_engine_ctx->function_lua;
serverAssert(lua);
callableLazyEnvReset *callback = NULL;

if (async) {
callback = zcalloc(sizeof(*callback));
*callback = (callableLazyEvalReset){
.context = lua_engine_ctx->eval_lua,
.engineLazyEvalResetCallback = resetEvalContext,
*callback = (callableLazyEnvReset){
.context = lua,
.engineLazyEnvResetCallback = resetLuaContext,
};
} else {
resetEvalContext(lua_engine_ctx->eval_lua);
resetLuaContext(lua);
}

initializeLuaState(lua_engine_ctx, VMSE_EVAL);
initializeLuaState(lua_engine_ctx, type);

return callback;
}
Expand All @@ -347,21 +350,21 @@ static void luaEngineFreeFunction(ValkeyModuleCtx *module_ctx,
compiledFunction *compiled_function) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
serverAssert(type == VMSE_EVAL || type == VMSE_FUNCTION);

luaEngineCtx *lua_engine_ctx = engine_ctx;
if (type == VMSE_EVAL) {
luaFunction *script = (luaFunction *)compiled_function->function;
if (lua_engine_ctx->eval_lua == script->lua) {
/* The lua context is still the same, which means that we're not
* resetting the whole eval context, and therefore, we need to
* delete the function from the lua context.
*/
lua_unref(lua_engine_ctx->eval_lua, script->function_ref);
}
zfree(script);
} else {
luaFunctionFreeFunction(lua_engine_ctx->function_lua, compiled_function->function);
lua_State *lua = type == VMSE_EVAL ? lua_engine_ctx->eval_lua : lua_engine_ctx->function_lua;
serverAssert(lua);

luaFunction *script = (luaFunction *)compiled_function->function;
if (lua == script->lua) {
/* The lua context is still the same, which means that we're not
* resetting the whole eval context, and therefore, we need to
* delete the function from the lua context.
*/
lua_unref(lua, script->function_ref);
}
zfree(script);

if (compiled_function->name) {
decrRefCount(compiled_function->name);
Expand All @@ -380,7 +383,7 @@ int luaEngineInitEngine(void) {
.free_function = luaEngineFreeFunction,
.call_function = luaEngineFunctionCall,
.get_function_memory_overhead = luaEngineFunctionMemoryOverhead,
.reset_eval_env = luaEngineResetEvalEnv,
.reset_env = luaEngineResetEvalEnv,
.get_memory_info = luaEngineGetMemoryInfo,
};

Expand Down
2 changes: 1 addition & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2083,7 +2083,7 @@ functionsLibCtx *disklessLoadFunctionsLibCtxCreate(void) {
/* Helper function to discard our temp function lib context
* when the loading succeeded or failed. */
void disklessLoadDiscardFunctionsLibCtx(functionsLibCtx *temp_functions_lib_ctx) {
freeFunctionsAsync(temp_functions_lib_ctx);
freeFunctionsAsync(temp_functions_lib_ctx, NULL);
}

/* If we know we got an entirely different data set from our primary
Expand Down
8 changes: 5 additions & 3 deletions src/scripting_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,14 @@ size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine,
return mem;
}

callableLazyEvalReset *scriptingEngineCallResetEvalEnvFunc(scriptingEngine *engine,
int async) {
callableLazyEnvReset *scriptingEngineCallResetEnvFunc(scriptingEngine *engine,
subsystemType type,
int async) {
engineSetupModuleCtx(engine, NULL);
callableLazyEvalReset *callback = engine->impl.methods.reset_eval_env(
callableLazyEnvReset *callback = engine->impl.methods.reset_env(
engine->module_ctx,
engine->impl.ctx,
type,
async);
engineTeardownModuleCtx(engine);
return callback;
Expand Down
7 changes: 4 additions & 3 deletions src/scripting_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ typedef ValkeyModuleScriptingEngineServerRuntimeCtx serverRuntimeCtx;
typedef ValkeyModuleScriptingEngineCompiledFunction compiledFunction;
typedef ValkeyModuleScriptingEngineSubsystemType subsystemType;
typedef ValkeyModuleScriptingEngineMemoryInfo engineMemoryInfo;
typedef ValkeyModuleScriptingEngineCallableLazyEvalReset callableLazyEvalReset;
typedef ValkeyModuleScriptingEngineCallableLazyEnvReset callableLazyEnvReset;
typedef ValkeyModuleScriptingEngineMethods engineMethods;

/*
Expand Down Expand Up @@ -76,8 +76,9 @@ void scriptingEngineCallFunction(scriptingEngine *engine,
size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine,
compiledFunction *compiled_function);

callableLazyEvalReset *scriptingEngineCallResetEvalEnvFunc(scriptingEngine *engine,
int async);
callableLazyEnvReset *scriptingEngineCallResetEnvFunc(scriptingEngine *engine,
subsystemType type,
int async);

engineMemoryInfo scriptingEngineCallGetMemoryInfo(scriptingEngine *engine,
subsystemType type);
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3508,7 +3508,7 @@ int redis_check_aof_main(int argc, char **argv);
/* Scripting */
void freeEvalScripts(dict *scripts, list *scripts_lru_list, list *engine_callbacks);
void freeEvalScriptsAsync(dict *scripts, list *scripts_lru_list, list *engine_callbacks);
void freeFunctionsAsync(functionsLibCtx *lib_ctx);
void freeFunctionsAsync(functionsLibCtx *lib_ctx, list *engine_callbacks);
void sha1hex(char *digest, char *script, size_t len);
unsigned long evalMemory(void);
dict *evalScriptsDict(void);
Expand Down
23 changes: 13 additions & 10 deletions src/valkeymodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -846,20 +846,20 @@ typedef enum ValkeyModuleScriptingEngineExecutionState {
VMSE_STATE_KILLED,
} ValkeyModuleScriptingEngineExecutionState;

typedef struct ValkeyModuleScriptingEngineCallableLazyEvalReset {
typedef struct ValkeyModuleScriptingEngineCallableLazyEnvReset {
void *context;

/*
* Callback function used for resetting the EVAL context implemented by an
* Callback function used for resetting the EVAL/FUNCTION context implemented by an
* engine. This callback will be called by a background thread when it's
* ready for resetting the context.
*
* - `context`: a generic pointer to a context object, stored in the
* callableLazyEvalReset struct.
* callableLazyEnvReset struct.
*
*/
void (*engineLazyEvalResetCallback)(void *context);
} ValkeyModuleScriptingEngineCallableLazyEvalReset;
void (*engineLazyEnvResetCallback)(void *context);
} ValkeyModuleScriptingEngineCallableLazyEnvReset;

/* The callback function called when either `EVAL`, `SCRIPT LOAD`, or
* `FUNCTION LOAD` command is called to compile the code.
Expand Down Expand Up @@ -956,19 +956,22 @@ typedef size_t (*ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCompiledFunction *compiled_function);

/* The callback function called when `SCRIPT FLUSH` command is called. The
* engine should reset the runtime environment used for EVAL scripts.
/* The callback function called when `SCRIPT FLUSH` or `FUNCTION FLUSH` command is called.
* The engine should reset the runtime environment used for EVAL scripts or FUNCTION SCRIPTS.
*
* - `module_ctx`: the module runtime context.
*
* - `engine_ctx`: the scripting engine runtime context.
*
* - `type`: the subsystem type.
*
* - `async`: if has value 1 then the reset is done asynchronously through
* the callback structure returned by this function.
*/
typedef ValkeyModuleScriptingEngineCallableLazyEvalReset *(*ValkeyModuleScriptingEngineResetEvalEnvFunc)(
typedef ValkeyModuleScriptingEngineCallableLazyEnvReset *(*ValkeyModuleScriptingEngineResetEnvFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineSubsystemType type,
int async);

/* Return the current used memory by the engine.
Expand Down Expand Up @@ -1006,8 +1009,8 @@ typedef struct ValkeyModuleScriptingEngineMethods {
ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead;

/* The callback function used to reset the runtime environment used
* by the scripting engine for EVAL scripts. */
ValkeyModuleScriptingEngineResetEvalEnvFunc reset_eval_env;
* by the scripting engine for EVAL scripts or FUNCTION scripts. */
ValkeyModuleScriptingEngineResetEnvFunc reset_env;

/* Function callback to get the used memory by the engine. */
ValkeyModuleScriptingEngineGetMemoryInfoFunc get_memory_info;
Expand Down
Loading
Loading