mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
Fix async lua bugs (#1123)
Fix async lua bugs: - Not calling toupper before looking up command id - Wrong error reply context
This commit is contained in:
parent
c6d34678f3
commit
2d501111ea
2 changed files with 15 additions and 14 deletions
|
@ -1052,8 +1052,14 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)
|
||||||
DCHECK(cntx->transaction);
|
DCHECK(cntx->transaction);
|
||||||
DVLOG(1) << "CallFromScript " << cntx->transaction->DebugId() << " " << ArgS(ca.args, 0);
|
DVLOG(1) << "CallFromScript " << cntx->transaction->DebugId() << " " << ArgS(ca.args, 0);
|
||||||
|
|
||||||
|
InterpreterReplier replier(ca.translator);
|
||||||
|
facade::SinkReplyBuilder* orig = cntx->Inject(&replier);
|
||||||
|
absl::Cleanup clean = [orig, cntx] { cntx->Inject(orig); };
|
||||||
|
|
||||||
if (ca.async) {
|
if (ca.async) {
|
||||||
auto& info = cntx->conn_state.script_info;
|
auto& info = cntx->conn_state.script_info;
|
||||||
|
|
||||||
|
ToUpper(&ca.args[0]);
|
||||||
auto* cid = registry_.Find(facade::ToSV(ca.args[0]));
|
auto* cid = registry_.Find(facade::ToSV(ca.args[0]));
|
||||||
|
|
||||||
if (!VerifyCommand(cid, ca.args, cntx))
|
if (!VerifyCommand(cid, ca.args, cntx))
|
||||||
|
@ -1064,8 +1070,6 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)
|
||||||
info->async_cmds_heap_mem += info->async_cmds.back().UsedHeapMemory();
|
info->async_cmds_heap_mem += info->async_cmds.back().UsedHeapMemory();
|
||||||
}
|
}
|
||||||
|
|
||||||
InterpreterReplier replier(ca.translator);
|
|
||||||
|
|
||||||
if (auto err = FlushEvalAsyncCmds(cntx, !ca.async); err) {
|
if (auto err = FlushEvalAsyncCmds(cntx, !ca.async); err) {
|
||||||
CapturingReplyBuilder::Apply(move(*err), &replier); // forward error to lua
|
CapturingReplyBuilder::Apply(move(*err), &replier); // forward error to lua
|
||||||
*ca.requested_abort = true;
|
*ca.requested_abort = true;
|
||||||
|
@ -1075,11 +1079,7 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)
|
||||||
if (ca.async)
|
if (ca.async)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
facade::SinkReplyBuilder* orig = cntx->Inject(&replier);
|
|
||||||
|
|
||||||
DispatchCommand(ca.args, cntx);
|
DispatchCommand(ca.args, cntx);
|
||||||
|
|
||||||
cntx->Inject(orig);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
|
void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import time
|
||||||
import json
|
import json
|
||||||
import pytest
|
import pytest
|
||||||
import random
|
import random
|
||||||
|
import itertools
|
||||||
from . import dfly_args, dfly_multi_test_args
|
from . import dfly_args, dfly_multi_test_args
|
||||||
|
|
||||||
DJANGO_CACHEOPS_SCRIPT = """
|
DJANGO_CACHEOPS_SCRIPT = """
|
||||||
|
@ -49,12 +50,12 @@ end
|
||||||
for db_table, disj in pairs(dnfs) do
|
for db_table, disj in pairs(dnfs) do
|
||||||
for _, conj in ipairs(disj) do
|
for _, conj in ipairs(disj) do
|
||||||
-- Ensure scheme is known
|
-- Ensure scheme is known
|
||||||
redis.call('sadd', prefix .. 'schemes:' .. db_table, conj_schema(conj))
|
redis.acall('sadd', prefix .. 'schemes:' .. db_table, conj_schema(conj))
|
||||||
|
|
||||||
-- Add new cache_key to list of dependencies
|
-- Add new cache_key to list of dependencies
|
||||||
local conj_key = conj_cache_key(db_table, conj)
|
local conj_key = conj_cache_key(db_table, conj)
|
||||||
|
|
||||||
redis.call('sadd', conj_key, key)
|
redis.acall('sadd', conj_key, key)
|
||||||
-- NOTE: an invalidator should live longer than any key it references.
|
-- NOTE: an invalidator should live longer than any key it references.
|
||||||
-- So we update its ttl on every key if needed.
|
-- So we update its ttl on every key if needed.
|
||||||
-- NOTE: if CACHEOPS_LRU is True when invalidators should be left persistent,
|
-- NOTE: if CACHEOPS_LRU is True when invalidators should be left persistent,
|
||||||
|
@ -83,7 +84,6 @@ def DJANGO_CACHEOPS_SCHEMA(vs): return {
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Test the main caching script of https://github.com/Suor/django-cacheops.
|
Test the main caching script of https://github.com/Suor/django-cacheops.
|
||||||
The script accesses undeclared keys (that are built based on argument data),
|
The script accesses undeclared keys (that are built based on argument data),
|
||||||
|
@ -202,9 +202,10 @@ async def test_golang_asynq_script(async_pool, num_queues=10, num_tasks=100):
|
||||||
await job
|
await job
|
||||||
|
|
||||||
|
|
||||||
ERROR_CALL_SCRIPT_TEMPLATE = """
|
ERROR_CALL_SCRIPT_TEMPLATE = [
|
||||||
redis.{}('LTRIM', 'l', 'a', 'b')
|
"redis.{}('LTRIM', 'l', 'a', 'b')", # error only on evaluation
|
||||||
"""
|
"redis.{}('obviously wrong')" # error immediately on preprocessing
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 1})
|
@dfly_args({"proactor_threads": 1})
|
||||||
|
@ -212,10 +213,10 @@ redis.{}('LTRIM', 'l', 'a', 'b')
|
||||||
async def test_eval_error_propagation(async_client):
|
async def test_eval_error_propagation(async_client):
|
||||||
CMDS = ['call', 'pcall', 'acall', 'apcall']
|
CMDS = ['call', 'pcall', 'acall', 'apcall']
|
||||||
|
|
||||||
for cmd in CMDS:
|
for cmd, template in itertools.product(CMDS, ERROR_CALL_SCRIPT_TEMPLATE):
|
||||||
does_abort = 'p' not in cmd
|
does_abort = 'p' not in cmd
|
||||||
try:
|
try:
|
||||||
await async_client.eval(ERROR_CALL_SCRIPT_TEMPLATE.format(cmd), 1, 'l')
|
await async_client.eval(template.format(cmd), 1, 'l')
|
||||||
if does_abort:
|
if does_abort:
|
||||||
assert False, "Eval must have thrown an error: " + cmd
|
assert False, "Eval must have thrown an error: " + cmd
|
||||||
except aioredis.RedisError as e:
|
except aioredis.RedisError as e:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue