From 76ea735aafe8cd3165a105b70ca427dce9d7aec4 Mon Sep 17 00:00:00 2001 From: Daniel Hiltgen Date: Sat, 3 May 2025 12:01:56 -0700 Subject: [PATCH] sched: logging improvements (#10550) This enhances our logging in the scheduler. The initial "waiting for server" log no longer claims an initial error state (now "not responding" which better reflects the actual state). Runners now have slog wiring to report more details about the runner, including PID. --- llm/server.go | 11 +++++++++ server/sched.go | 59 +++++++++++++++++++++++++++++++++----------- server/sched_test.go | 1 + 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/llm/server.go b/llm/server.go index 0f23fc4c5..528af71f9 100644 --- a/llm/server.go +++ b/llm/server.go @@ -44,6 +44,7 @@ type LlamaServer interface { EstimatedVRAM() uint64 // Total VRAM across all GPUs EstimatedTotal() uint64 EstimatedVRAMByGPU(gpuID string) uint64 + Pid() int } // llmServer is an instance of the llama.cpp server @@ -520,6 +521,9 @@ func (s *llmServer) getServerStatus(ctx context.Context) (ServerStatus, error) { if errors.Is(err, context.DeadlineExceeded) { return ServerStatusNotResponding, errors.New("server not responding") } + if strings.Contains(err.Error(), "connection refused") { + return ServerStatusNotResponding, errors.New("connection refused") + } return ServerStatusError, fmt.Errorf("health resp: %w", err) } defer resp.Body.Close() @@ -640,6 +644,13 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error { } } +func (s *llmServer) Pid() int { + if s.cmd != nil && s.cmd.Process != nil { + return s.cmd.Process.Pid + } + return -1 +} + var grammarJSON = ` root ::= object value ::= object | array | string | number | ("true" | "false" | "null") ws diff --git a/server/sched.go b/server/sched.go index 8513fbf4b..6fbfcd80e 100644 --- a/server/sched.go +++ b/server/sched.go @@ -147,6 +147,7 @@ func (s *Scheduler) processPending(ctx context.Context) { s.loadedMu.Unlock() if runner != nil { if runner.needsReload(ctx, pending) { + slog.Debug("reloading", "runner", runner) runnerToExpire = runner } else { // Runner is usable, return it @@ -282,7 +283,7 @@ func (s *Scheduler) processPending(ctx context.Context) { } // Trigger an expiration to unload once it's done runnerToExpire.refMu.Lock() - slog.Debug("resetting model to expire immediately to make room", "modelPath", runnerToExpire.modelPath, "refCount", runnerToExpire.refCount) + slog.Debug("resetting model to expire immediately to make room", "runner", runnerToExpire, "refCount", runnerToExpire.refCount) if runnerToExpire.expireTimer != nil { runnerToExpire.expireTimer.Stop() runnerToExpire.expireTimer = nil @@ -331,16 +332,16 @@ func (s *Scheduler) processCompleted(ctx context.Context) { runner.refCount-- if runner.refCount <= 0 { if runner.sessionDuration <= 0 { - slog.Debug("runner with zero duration has gone idle, expiring to unload", "modelPath", runner.modelPath) + slog.Debug("runner with zero duration has gone idle, expiring to unload", "runner", runner) if runner.expireTimer != nil { runner.expireTimer.Stop() runner.expireTimer = nil } s.expiredCh <- runner } else if runner.expireTimer == nil { - slog.Debug("runner with non-zero duration has gone idle, adding timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration) + slog.Debug("runner with non-zero duration has gone idle, adding timer", "runner", runner, "duration", runner.sessionDuration) runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() { - slog.Debug("timer expired, expiring to unload", "modelPath", runner.modelPath) + slog.Debug("timer expired, expiring to unload", "runner", runner) runner.refMu.Lock() defer runner.refMu.Unlock() if runner.expireTimer != nil { @@ -351,18 +352,18 @@ func (s *Scheduler) processCompleted(ctx context.Context) { }) runner.expiresAt = time.Now().Add(runner.sessionDuration) } else { - slog.Debug("runner with non-zero duration has gone idle, resetting timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration) + slog.Debug("runner with non-zero duration has gone idle, resetting timer", "runner", runner, "duration", runner.sessionDuration) runner.expireTimer.Reset(runner.sessionDuration) runner.expiresAt = time.Now().Add(runner.sessionDuration) } } - slog.Debug("after processing request finished event", "modelPath", runner.modelPath, "refCount", runner.refCount) + slog.Debug("after processing request finished event", "runner", runner, "refCount", runner.refCount) runner.refMu.Unlock() case runner := <-s.expiredCh: - slog.Debug("runner expired event received", "modelPath", runner.modelPath) + slog.Debug("runner expired event received", "runner", runner) runner.refMu.Lock() if runner.refCount > 0 { - slog.Debug("expired event with positive ref count, retrying", "modelPath", runner.modelPath, "refCount", runner.refCount) + slog.Debug("expired event with positive ref count, retrying", "runner", runner, "refCount", runner.refCount) go func(runner *runnerRef) { // We can't unload yet, but want to as soon as the current request completes // So queue up another expired event @@ -374,16 +375,16 @@ func (s *Scheduler) processCompleted(ctx context.Context) { } s.loadedMu.Lock() - slog.Debug("got lock to unload", "modelPath", runner.modelPath) + slog.Debug("got lock to unload", "runner", runner) finished := runner.waitForVRAMRecovery() runner.unload() delete(s.loaded, runner.modelPath) s.loadedMu.Unlock() - slog.Debug("runner released", "modelPath", runner.modelPath) + slog.Debug("runner released", "runner", runner) runner.refMu.Unlock() <-finished - slog.Debug("sending an unloaded event", "modelPath", runner.modelPath) + slog.Debug("sending an unloaded event", "runner", runner) s.unloadedCh <- struct{}{} } } @@ -406,7 +407,7 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm pending.successCh <- runner go func() { <-pending.ctx.Done() - slog.Debug("context for request finished") + slog.Debug("context for request finished", "runner", runner) finished <- pending }() } @@ -441,6 +442,7 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis estimatedVRAM: llama.EstimatedVRAM(), estimatedTotal: llama.EstimatedTotal(), loading: true, + pid: llama.Pid(), } runner.numParallel = numParallel runner.refMu.Lock() // hold lock until running or aborted @@ -460,6 +462,9 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis return } slog.Debug("finished setting up runner", "model", req.model.ModelPath) + if runner.pid < 0 { + runner.pid = llama.Pid() + } runner.refCount++ runner.loading = false go func() { @@ -544,6 +549,7 @@ type runnerRef struct { refCount uint // prevent unloading if > 0 llama llm.LlamaServer + pid int loading bool // True only during initial load, then false forever gpus discover.GpuInfoList // Recorded at time of provisioning estimatedVRAM uint64 @@ -668,6 +674,31 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any { return finished } +func (runner *runnerRef) LogValue() slog.Value { + if runner == nil { + return slog.StringValue("nil") + } + attrs := []slog.Attr{} + if runner.model != nil { + attrs = append(attrs, slog.String("name", runner.model.Name)) + } + if len(runner.gpus) > 0 { + attrs = append(attrs, + slog.String("inference", runner.gpus[0].Library), + slog.Int("devices", len(runner.gpus)), + ) + } + attrs = append(attrs, + slog.String("size", format.HumanBytes2(runner.estimatedTotal)), + slog.String("vram", format.HumanBytes2(runner.estimatedVRAM)), + slog.Int("num_ctx", runner.NumCtx), + slog.Int("parallel", runner.numParallel), + slog.Int("pid", runner.pid), + slog.String("model", runner.modelPath), + ) + return slog.GroupValue(attrs...) +} + type ByDurationAndName []*runnerRef func (a ByDurationAndName) Len() int { return len(a) } @@ -790,12 +821,12 @@ func (s *Scheduler) findRunnerToUnload() *runnerRef { rc := runner.refCount runner.refMu.Unlock() if rc == 0 { - slog.Debug("found an idle runner to unload") + slog.Debug("found an idle runner to unload", "runner", runner) return runner } } // None appear idle, just wait for the one with the shortest duration - slog.Debug("no idle runners, picking the shortest duration", "count", len(runnerList)) + slog.Debug("no idle runners, picking the shortest duration", "runner_count", len(runnerList), "runner", runnerList[0]) return runnerList[0] } diff --git a/server/sched_test.go b/server/sched_test.go index 274e18cec..1e8e11372 100644 --- a/server/sched_test.go +++ b/server/sched_test.go @@ -792,3 +792,4 @@ func (s *mockLlm) Close() error { func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM } func (s *mockLlm) EstimatedTotal() uint64 { return s.estimatedTotal } func (s *mockLlm) EstimatedVRAMByGPU(gpuid string) uint64 { return s.estimatedVRAMByGPU[gpuid] } +func (s *mockLlm) Pid() int { return -1 }