mirror of
https://github.com/ollama/ollama.git
synced 2025-05-10 18:06:33 +02:00
sched: fix race leading to orphaned runners (#10599)
If a model is loading, and the request context is canceled during the load by a client closing the connection, and another request is inbound for the same model with a different configuration (context size, etc.) thus requiring a reload, two unload events can be in flight. The first shuts down the original model load, but the second one caused the loss of the new reloading runner reference, thus triggering the leak. The primary fix is detecting the duplicate unload and ignoring the second instance. The load routine is also hardened to ensure we detect clobbering an already present runner and unload it with a warning.
This commit is contained in:
parent
392de84031
commit
5e380c3b42
2 changed files with 40 additions and 20 deletions
|
@ -1010,17 +1010,17 @@ func (s *llmServer) Close() error {
|
|||
s.llamaModelLock.Unlock()
|
||||
|
||||
if s.cmd != nil {
|
||||
slog.Debug("stopping llama server")
|
||||
slog.Debug("stopping llama server", "pid", s.Pid())
|
||||
if err := s.cmd.Process.Kill(); err != nil {
|
||||
return err
|
||||
}
|
||||
// if ProcessState is already populated, Wait already completed, no need to wait again
|
||||
if s.cmd.ProcessState == nil {
|
||||
slog.Debug("waiting for llama server to exit")
|
||||
slog.Debug("waiting for llama server to exit", "pid", s.Pid())
|
||||
<-s.done
|
||||
}
|
||||
|
||||
slog.Debug("llama server stopped")
|
||||
slog.Debug("llama server stopped", "pid", s.Pid())
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -296,13 +296,13 @@ func (s *Scheduler) processPending(ctx context.Context) {
|
|||
// Wait for the unload to happen
|
||||
// Note: at this point we're queueing up all incoming requests, even if they were for
|
||||
// a different model that's loaded and not scheduled to be removed.
|
||||
slog.Debug("waiting for pending requests to complete and unload to occur", "modelPath", runnerToExpire.modelPath)
|
||||
slog.Debug("waiting for pending requests to complete and unload to occur", "runner", runnerToExpire)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
slog.Debug("shutting down scheduler pending loop")
|
||||
return
|
||||
case <-s.unloadedCh:
|
||||
slog.Debug("unload completed", "modelPath", runnerToExpire.modelPath)
|
||||
slog.Debug("unload completed", "runner", runnerToExpire)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -375,17 +375,29 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
|
|||
}
|
||||
|
||||
s.loadedMu.Lock()
|
||||
slog.Debug("got lock to unload", "runner", runner)
|
||||
finished := runner.waitForVRAMRecovery()
|
||||
runner.unload()
|
||||
delete(s.loaded, runner.modelPath)
|
||||
s.loadedMu.Unlock()
|
||||
slog.Debug("runner released", "runner", runner)
|
||||
runner.refMu.Unlock()
|
||||
|
||||
<-finished
|
||||
slog.Debug("sending an unloaded event", "runner", runner)
|
||||
s.unloadedCh <- struct{}{}
|
||||
slog.Debug("got lock to unload expired event", "runner", runner)
|
||||
runnerToUnload := s.loaded[runner.modelPath]
|
||||
if runnerToUnload == nil {
|
||||
// If runnerToUnload is nil, we already processed an event and
|
||||
// unloaded it. This double unload can happen if the initial
|
||||
// request is canceled and we're trying to load another model
|
||||
// that requires this one to be evicted, or the settings change
|
||||
// and require a reload
|
||||
s.loadedMu.Unlock()
|
||||
runner.refMu.Unlock()
|
||||
slog.Debug("duplicate expired event, ignoring", "runner", runner)
|
||||
} else {
|
||||
slog.Debug("starting background wait for VRAM recovery", "runner", runner)
|
||||
finished := runner.waitForVRAMRecovery()
|
||||
runner.unload()
|
||||
delete(s.loaded, runner.modelPath)
|
||||
s.loadedMu.Unlock()
|
||||
slog.Debug("runner terminated and removed from list, blocking for VRAM recovery", "runner", runner)
|
||||
<-finished
|
||||
runner.refMu.Unlock()
|
||||
slog.Debug("sending an unloaded event", "runner", runner)
|
||||
s.unloadedCh <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -448,6 +460,13 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
|
|||
runner.refMu.Lock() // hold lock until running or aborted
|
||||
|
||||
s.loadedMu.Lock()
|
||||
if oldRunner, ok := s.loaded[req.model.ModelPath]; ok {
|
||||
// Shouldn't happen, but safeguard against leaking a runner
|
||||
slog.Warn("model was still loaded", "old_runner", oldRunner, "new_runner", runner)
|
||||
oldRunner.refMu.Lock()
|
||||
oldRunner.unload()
|
||||
oldRunner.refMu.Unlock()
|
||||
}
|
||||
s.loaded[req.model.ModelPath] = runner
|
||||
slog.Info("loaded runners", "count", len(s.loaded))
|
||||
s.loadedMu.Unlock()
|
||||
|
@ -457,11 +476,11 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
|
|||
if err = llama.WaitUntilRunning(req.ctx); err != nil {
|
||||
slog.Error("error loading llama server", "error", err)
|
||||
req.errCh <- err
|
||||
slog.Debug("triggering expiration for failed load", "model", runner.modelPath)
|
||||
slog.Debug("triggering expiration for failed load", "runner", runner)
|
||||
s.expiredCh <- runner
|
||||
return
|
||||
}
|
||||
slog.Debug("finished setting up runner", "model", req.model.ModelPath)
|
||||
slog.Debug("finished setting up", "runner", runner)
|
||||
if runner.pid < 0 {
|
||||
runner.pid = llama.Pid()
|
||||
}
|
||||
|
@ -634,6 +653,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any {
|
|||
(len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal")) ||
|
||||
(runtime.GOOS == "windows" && runner.gpus[0].Library != "cuda") {
|
||||
finished <- struct{}{}
|
||||
slog.Debug("no need to wait for VRAM recovery", "runner", runner)
|
||||
return finished
|
||||
}
|
||||
start := time.Now()
|
||||
|
@ -652,7 +672,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any {
|
|||
for {
|
||||
<-ticker.C
|
||||
if time.Now().After(expiresAt) {
|
||||
slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "model", runner.modelPath)
|
||||
slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "runner", runner)
|
||||
finished <- struct{}{}
|
||||
}
|
||||
|
||||
|
@ -665,7 +685,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any {
|
|||
}
|
||||
// If we're within ~80% of the estimated memory usage recovered, bail out
|
||||
if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.estimatedVRAM)*0.8 {
|
||||
slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "model", runner.modelPath)
|
||||
slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "runner", runner)
|
||||
finished <- struct{}{}
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue