mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
chore: lock keys when going through fast-path execution (#2491)
This is needed if we want to allow asynchronous transactional operations during the callback execution. Also update actions versions. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
675b3889a4
commit
3ebb32df3f
2 changed files with 19 additions and 19 deletions
10
.github/workflows/ci.yml
vendored
10
.github/workflows/ci.yml
vendored
|
@ -11,7 +11,7 @@ jobs:
|
||||||
pre-commit:
|
pre-commit:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v4
|
||||||
with:
|
with:
|
||||||
fetch-depth: 2
|
fetch-depth: 2
|
||||||
- uses: actions/setup-python@v3
|
- uses: actions/setup-python@v3
|
||||||
|
@ -19,7 +19,7 @@ jobs:
|
||||||
run: |
|
run: |
|
||||||
python -m pip install pre-commit
|
python -m pip install pre-commit
|
||||||
python -m pip freeze --local
|
python -m pip freeze --local
|
||||||
- uses: actions/cache@v3
|
- uses: actions/cache@v4
|
||||||
with:
|
with:
|
||||||
path: ~/.cache/pre-commit
|
path: ~/.cache/pre-commit
|
||||||
key: pre-commit|${{ env.pythonLocation }}|${{ hashFiles('.pre-commit-config.yaml') }}
|
key: pre-commit|${{ env.pythonLocation }}|${{ hashFiles('.pre-commit-config.yaml') }}
|
||||||
|
@ -159,7 +159,7 @@ jobs:
|
||||||
- name: Set up Helm
|
- name: Set up Helm
|
||||||
uses: azure/setup-helm@v3
|
uses: azure/setup-helm@v3
|
||||||
|
|
||||||
- uses: actions/setup-python@v4
|
- uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: "3.9"
|
python-version: "3.9"
|
||||||
check-latest: true
|
check-latest: true
|
||||||
|
@ -169,7 +169,7 @@ jobs:
|
||||||
go test -v ./contrib/charts/dragonfly/...
|
go test -v ./contrib/charts/dragonfly/...
|
||||||
|
|
||||||
- name: Set up chart-testing
|
- name: Set up chart-testing
|
||||||
uses: helm/chart-testing-action@v2.3.1
|
uses: helm/chart-testing-action@v2.6.1
|
||||||
|
|
||||||
- name: Run chart-testing (list-changed)
|
- name: Run chart-testing (list-changed)
|
||||||
id: list-changed
|
id: list-changed
|
||||||
|
@ -188,7 +188,7 @@ jobs:
|
||||||
|
|
||||||
- if: steps.list-changed.outputs.changed == 'true' || github.event_name == 'workflow_dispatch'
|
- if: steps.list-changed.outputs.changed == 'true' || github.event_name == 'workflow_dispatch'
|
||||||
name: Create kind cluster
|
name: Create kind cluster
|
||||||
uses: helm/kind-action@v1.5.0
|
uses: helm/kind-action@v1.8.0
|
||||||
|
|
||||||
- if: steps.list-changed.outputs.changed == 'true' || github.event_name == 'workflow_dispatch'
|
- if: steps.list-changed.outputs.changed == 'true' || github.event_name == 'workflow_dispatch'
|
||||||
name: Getting cluster ready
|
name: Getting cluster ready
|
||||||
|
|
|
@ -959,7 +959,7 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) {
|
||||||
DCHECK_EQ(0u, txid_);
|
DCHECK_EQ(0u, txid_);
|
||||||
|
|
||||||
auto& sd = shard_data_[SidToId(unique_shard_id_)];
|
auto& sd = shard_data_[SidToId(unique_shard_id_)];
|
||||||
DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER));
|
DCHECK_EQ(0, sd.local_mask & OUT_OF_ORDER);
|
||||||
|
|
||||||
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id();
|
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id();
|
||||||
DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id();
|
DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id();
|
||||||
|
@ -1054,40 +1054,40 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
|
||||||
auto& sd = shard_data_[SidToId(unique_shard_id_)];
|
auto& sd = shard_data_[SidToId(unique_shard_id_)];
|
||||||
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
||||||
|
|
||||||
bool unlocked_keys =
|
bool shard_unlocked = shard->shard_lock()->Check(mode);
|
||||||
shard->db_slice().CheckLock(mode, lock_args) && shard->shard_lock()->Check(mode);
|
bool keys_unlocked = shard->db_slice().Acquire(mode, lock_args);
|
||||||
bool quick_run = unlocked_keys;
|
bool quick_run = shard_unlocked && keys_unlocked;
|
||||||
|
bool continue_scheduling = !quick_run;
|
||||||
|
|
||||||
|
sd.local_mask |= KEYLOCK_ACQUIRED;
|
||||||
|
|
||||||
// Fast path. If none of the keys are locked, we can run briefly atomically on the thread
|
// Fast path. If none of the keys are locked, we can run briefly atomically on the thread
|
||||||
// without acquiring them at all.
|
// without acquiring them at all.
|
||||||
if (quick_run) {
|
if (quick_run) {
|
||||||
// TBD add acquire lock here
|
|
||||||
auto result = RunQuickie(shard);
|
auto result = RunQuickie(shard);
|
||||||
local_result_ = result.status;
|
local_result_ = result.status;
|
||||||
|
|
||||||
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
|
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
|
||||||
// If we want to run again, we have to actually acquire keys, but keep ourselves disarmed
|
// If we want to run again, we have to actually schedule this transaction
|
||||||
DCHECK_EQ(sd.is_armed, false);
|
DCHECK_EQ(sd.is_armed, false);
|
||||||
unlocked_keys = false;
|
continue_scheduling = true;
|
||||||
} else {
|
} else {
|
||||||
LogAutoJournalOnShard(shard, result);
|
LogAutoJournalOnShard(shard, result);
|
||||||
|
shard->db_slice().Release(mode, lock_args);
|
||||||
|
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path. Some of the keys are locked, so we schedule on the transaction queue.
|
// Slow path. Some of the keys are locked, so we schedule on the transaction queue.
|
||||||
if (!unlocked_keys) {
|
if (continue_scheduling) {
|
||||||
coordinator_state_ |= COORD_SCHED; // safe because single shard
|
coordinator_state_ |= COORD_SCHED; // safe because single shard
|
||||||
txid_ = op_seq.fetch_add(1, memory_order_relaxed); // -
|
txid_ = op_seq.fetch_add(1, memory_order_relaxed); // -
|
||||||
sd.pq_pos = shard->txq()->Insert(this);
|
sd.pq_pos = shard->txq()->Insert(this);
|
||||||
|
|
||||||
DCHECK_EQ(sd.local_mask & KEYLOCK_ACQUIRED, 0);
|
|
||||||
shard->db_slice().Acquire(mode, lock_args);
|
|
||||||
sd.local_mask |= KEYLOCK_ACQUIRED;
|
|
||||||
|
|
||||||
DVLOG(1) << "Rescheduling " << DebugId() << " into TxQueue of size " << shard->txq()->size();
|
DVLOG(1) << "Rescheduling " << DebugId() << " into TxQueue of size " << shard->txq()->size();
|
||||||
|
|
||||||
// If there are blocked transactons waiting for this tx keys, we will add this transaction
|
// If there are blocked transactons waiting for these tx keys, we add this transaction
|
||||||
// to the tx-queue (these keys will be contended). This will happen even if the queue was empty.
|
// to the tx-queue (these keys will be contended). This happen even if the queue is empty.
|
||||||
// In that case we must poll the queue, because there will be no other callback trigerring the
|
// In that case we must poll the queue, because there will be no other callback trigerring the
|
||||||
// queue before us.
|
// queue before us.
|
||||||
shard->PollExecution("schedule_unique", nullptr);
|
shard->PollExecution("schedule_unique", nullptr);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue