sched: logging improvements (#10550)

This enhances our logging in the scheduler.  The initial "waiting for server" log
no longer claims an initial error state (now "not responding" which better reflects
the actual state).  Runners now have slog wiring to report more details about the
runner, including PID.
This commit is contained in:
Daniel Hiltgen 2025-05-03 12:01:56 -07:00 committed by GitHub
parent dd1d4e99e7
commit 76ea735aaf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 57 additions and 14 deletions

View file

@ -44,6 +44,7 @@ type LlamaServer interface {
EstimatedVRAM() uint64 // Total VRAM across all GPUs EstimatedVRAM() uint64 // Total VRAM across all GPUs
EstimatedTotal() uint64 EstimatedTotal() uint64
EstimatedVRAMByGPU(gpuID string) uint64 EstimatedVRAMByGPU(gpuID string) uint64
Pid() int
} }
// llmServer is an instance of the llama.cpp server // llmServer is an instance of the llama.cpp server
@ -520,6 +521,9 @@ func (s *llmServer) getServerStatus(ctx context.Context) (ServerStatus, error) {
if errors.Is(err, context.DeadlineExceeded) { if errors.Is(err, context.DeadlineExceeded) {
return ServerStatusNotResponding, errors.New("server not responding") return ServerStatusNotResponding, errors.New("server not responding")
} }
if strings.Contains(err.Error(), "connection refused") {
return ServerStatusNotResponding, errors.New("connection refused")
}
return ServerStatusError, fmt.Errorf("health resp: %w", err) return ServerStatusError, fmt.Errorf("health resp: %w", err)
} }
defer resp.Body.Close() defer resp.Body.Close()
@ -640,6 +644,13 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error {
} }
} }
func (s *llmServer) Pid() int {
if s.cmd != nil && s.cmd.Process != nil {
return s.cmd.Process.Pid
}
return -1
}
var grammarJSON = ` var grammarJSON = `
root ::= object root ::= object
value ::= object | array | string | number | ("true" | "false" | "null") ws value ::= object | array | string | number | ("true" | "false" | "null") ws

View file

@ -147,6 +147,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
s.loadedMu.Unlock() s.loadedMu.Unlock()
if runner != nil { if runner != nil {
if runner.needsReload(ctx, pending) { if runner.needsReload(ctx, pending) {
slog.Debug("reloading", "runner", runner)
runnerToExpire = runner runnerToExpire = runner
} else { } else {
// Runner is usable, return it // Runner is usable, return it
@ -282,7 +283,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
} }
// Trigger an expiration to unload once it's done // Trigger an expiration to unload once it's done
runnerToExpire.refMu.Lock() runnerToExpire.refMu.Lock()
slog.Debug("resetting model to expire immediately to make room", "modelPath", runnerToExpire.modelPath, "refCount", runnerToExpire.refCount) slog.Debug("resetting model to expire immediately to make room", "runner", runnerToExpire, "refCount", runnerToExpire.refCount)
if runnerToExpire.expireTimer != nil { if runnerToExpire.expireTimer != nil {
runnerToExpire.expireTimer.Stop() runnerToExpire.expireTimer.Stop()
runnerToExpire.expireTimer = nil runnerToExpire.expireTimer = nil
@ -331,16 +332,16 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
runner.refCount-- runner.refCount--
if runner.refCount <= 0 { if runner.refCount <= 0 {
if runner.sessionDuration <= 0 { if runner.sessionDuration <= 0 {
slog.Debug("runner with zero duration has gone idle, expiring to unload", "modelPath", runner.modelPath) slog.Debug("runner with zero duration has gone idle, expiring to unload", "runner", runner)
if runner.expireTimer != nil { if runner.expireTimer != nil {
runner.expireTimer.Stop() runner.expireTimer.Stop()
runner.expireTimer = nil runner.expireTimer = nil
} }
s.expiredCh <- runner s.expiredCh <- runner
} else if runner.expireTimer == nil { } else if runner.expireTimer == nil {
slog.Debug("runner with non-zero duration has gone idle, adding timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration) slog.Debug("runner with non-zero duration has gone idle, adding timer", "runner", runner, "duration", runner.sessionDuration)
runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() { runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() {
slog.Debug("timer expired, expiring to unload", "modelPath", runner.modelPath) slog.Debug("timer expired, expiring to unload", "runner", runner)
runner.refMu.Lock() runner.refMu.Lock()
defer runner.refMu.Unlock() defer runner.refMu.Unlock()
if runner.expireTimer != nil { if runner.expireTimer != nil {
@ -351,18 +352,18 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
}) })
runner.expiresAt = time.Now().Add(runner.sessionDuration) runner.expiresAt = time.Now().Add(runner.sessionDuration)
} else { } else {
slog.Debug("runner with non-zero duration has gone idle, resetting timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration) slog.Debug("runner with non-zero duration has gone idle, resetting timer", "runner", runner, "duration", runner.sessionDuration)
runner.expireTimer.Reset(runner.sessionDuration) runner.expireTimer.Reset(runner.sessionDuration)
runner.expiresAt = time.Now().Add(runner.sessionDuration) runner.expiresAt = time.Now().Add(runner.sessionDuration)
} }
} }
slog.Debug("after processing request finished event", "modelPath", runner.modelPath, "refCount", runner.refCount) slog.Debug("after processing request finished event", "runner", runner, "refCount", runner.refCount)
runner.refMu.Unlock() runner.refMu.Unlock()
case runner := <-s.expiredCh: case runner := <-s.expiredCh:
slog.Debug("runner expired event received", "modelPath", runner.modelPath) slog.Debug("runner expired event received", "runner", runner)
runner.refMu.Lock() runner.refMu.Lock()
if runner.refCount > 0 { if runner.refCount > 0 {
slog.Debug("expired event with positive ref count, retrying", "modelPath", runner.modelPath, "refCount", runner.refCount) slog.Debug("expired event with positive ref count, retrying", "runner", runner, "refCount", runner.refCount)
go func(runner *runnerRef) { go func(runner *runnerRef) {
// We can't unload yet, but want to as soon as the current request completes // We can't unload yet, but want to as soon as the current request completes
// So queue up another expired event // So queue up another expired event
@ -374,16 +375,16 @@ func (s *Scheduler) processCompleted(ctx context.Context) {
} }
s.loadedMu.Lock() s.loadedMu.Lock()
slog.Debug("got lock to unload", "modelPath", runner.modelPath) slog.Debug("got lock to unload", "runner", runner)
finished := runner.waitForVRAMRecovery() finished := runner.waitForVRAMRecovery()
runner.unload() runner.unload()
delete(s.loaded, runner.modelPath) delete(s.loaded, runner.modelPath)
s.loadedMu.Unlock() s.loadedMu.Unlock()
slog.Debug("runner released", "modelPath", runner.modelPath) slog.Debug("runner released", "runner", runner)
runner.refMu.Unlock() runner.refMu.Unlock()
<-finished <-finished
slog.Debug("sending an unloaded event", "modelPath", runner.modelPath) slog.Debug("sending an unloaded event", "runner", runner)
s.unloadedCh <- struct{}{} s.unloadedCh <- struct{}{}
} }
} }
@ -406,7 +407,7 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm
pending.successCh <- runner pending.successCh <- runner
go func() { go func() {
<-pending.ctx.Done() <-pending.ctx.Done()
slog.Debug("context for request finished") slog.Debug("context for request finished", "runner", runner)
finished <- pending finished <- pending
}() }()
} }
@ -441,6 +442,7 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
estimatedVRAM: llama.EstimatedVRAM(), estimatedVRAM: llama.EstimatedVRAM(),
estimatedTotal: llama.EstimatedTotal(), estimatedTotal: llama.EstimatedTotal(),
loading: true, loading: true,
pid: llama.Pid(),
} }
runner.numParallel = numParallel runner.numParallel = numParallel
runner.refMu.Lock() // hold lock until running or aborted runner.refMu.Lock() // hold lock until running or aborted
@ -460,6 +462,9 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis
return return
} }
slog.Debug("finished setting up runner", "model", req.model.ModelPath) slog.Debug("finished setting up runner", "model", req.model.ModelPath)
if runner.pid < 0 {
runner.pid = llama.Pid()
}
runner.refCount++ runner.refCount++
runner.loading = false runner.loading = false
go func() { go func() {
@ -544,6 +549,7 @@ type runnerRef struct {
refCount uint // prevent unloading if > 0 refCount uint // prevent unloading if > 0
llama llm.LlamaServer llama llm.LlamaServer
pid int
loading bool // True only during initial load, then false forever loading bool // True only during initial load, then false forever
gpus discover.GpuInfoList // Recorded at time of provisioning gpus discover.GpuInfoList // Recorded at time of provisioning
estimatedVRAM uint64 estimatedVRAM uint64
@ -668,6 +674,31 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any {
return finished return finished
} }
func (runner *runnerRef) LogValue() slog.Value {
if runner == nil {
return slog.StringValue("nil")
}
attrs := []slog.Attr{}
if runner.model != nil {
attrs = append(attrs, slog.String("name", runner.model.Name))
}
if len(runner.gpus) > 0 {
attrs = append(attrs,
slog.String("inference", runner.gpus[0].Library),
slog.Int("devices", len(runner.gpus)),
)
}
attrs = append(attrs,
slog.String("size", format.HumanBytes2(runner.estimatedTotal)),
slog.String("vram", format.HumanBytes2(runner.estimatedVRAM)),
slog.Int("num_ctx", runner.NumCtx),
slog.Int("parallel", runner.numParallel),
slog.Int("pid", runner.pid),
slog.String("model", runner.modelPath),
)
return slog.GroupValue(attrs...)
}
type ByDurationAndName []*runnerRef type ByDurationAndName []*runnerRef
func (a ByDurationAndName) Len() int { return len(a) } func (a ByDurationAndName) Len() int { return len(a) }
@ -790,12 +821,12 @@ func (s *Scheduler) findRunnerToUnload() *runnerRef {
rc := runner.refCount rc := runner.refCount
runner.refMu.Unlock() runner.refMu.Unlock()
if rc == 0 { if rc == 0 {
slog.Debug("found an idle runner to unload") slog.Debug("found an idle runner to unload", "runner", runner)
return runner return runner
} }
} }
// None appear idle, just wait for the one with the shortest duration // None appear idle, just wait for the one with the shortest duration
slog.Debug("no idle runners, picking the shortest duration", "count", len(runnerList)) slog.Debug("no idle runners, picking the shortest duration", "runner_count", len(runnerList), "runner", runnerList[0])
return runnerList[0] return runnerList[0]
} }

View file

@ -792,3 +792,4 @@ func (s *mockLlm) Close() error {
func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM } func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM }
func (s *mockLlm) EstimatedTotal() uint64 { return s.estimatedTotal } func (s *mockLlm) EstimatedTotal() uint64 { return s.estimatedTotal }
func (s *mockLlm) EstimatedVRAMByGPU(gpuid string) uint64 { return s.estimatedVRAMByGPU[gpuid] } func (s *mockLlm) EstimatedVRAMByGPU(gpuid string) uint64 { return s.estimatedVRAMByGPU[gpuid] }
func (s *mockLlm) Pid() int { return -1 }