diff --git a/llm/server.go b/llm/server.go index 0f23fc4c5..528af71f9 100644 --- a/llm/server.go +++ b/llm/server.go @@ -44,6 +44,7 @@ type LlamaServer interface { EstimatedVRAM() uint64 // Total VRAM across all GPUs EstimatedTotal() uint64 EstimatedVRAMByGPU(gpuID string) uint64 + Pid() int } // llmServer is an instance of the llama.cpp server @@ -520,6 +521,9 @@ func (s *llmServer) getServerStatus(ctx context.Context) (ServerStatus, error) { if errors.Is(err, context.DeadlineExceeded) { return ServerStatusNotResponding, errors.New("server not responding") } + if strings.Contains(err.Error(), "connection refused") { + return ServerStatusNotResponding, errors.New("connection refused") + } return ServerStatusError, fmt.Errorf("health resp: %w", err) } defer resp.Body.Close() @@ -640,6 +644,13 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error { } } +func (s *llmServer) Pid() int { + if s.cmd != nil && s.cmd.Process != nil { + return s.cmd.Process.Pid + } + return -1 +} + var grammarJSON = ` root ::= object value ::= object | array | string | number | ("true" | "false" | "null") ws diff --git a/server/sched.go b/server/sched.go index 8513fbf4b..6fbfcd80e 100644 --- a/server/sched.go +++ b/server/sched.go @@ -147,6 +147,7 @@ func (s *Scheduler) processPending(ctx context.Context) { s.loadedMu.Unlock() if runner != nil { if runner.needsReload(ctx, pending) { + slog.Debug("reloading", "runner", runner) runnerToExpire = runner } else { // Runner is usable, return it @@ -282,7 +283,7 @@ func (s *Scheduler) processPending(ctx context.Context) { } // Trigger an expiration to unload once it's done runnerToExpire.refMu.Lock() - slog.Debug("resetting model to expire immediately to make room", "modelPath", runnerToExpire.modelPath, "refCount", runnerToExpire.refCount) + slog.Debug("resetting model to expire immediately to make room", "runner", runnerToExpire, "refCount", runnerToExpire.refCount) if runnerToExpire.expireTimer != nil { runnerToExpire.expireTimer.Stop() runnerToExpire.expireTimer = nil @@ -331,16 +332,16 @@ func (s *Scheduler) processCompleted(ctx context.Context) { runner.refCount-- if runner.refCount <= 0 { if runner.sessionDuration <= 0 { - slog.Debug("runner with zero duration has gone idle, expiring to unload", "modelPath", runner.modelPath) + slog.Debug("runner with zero duration has gone idle, expiring to unload", "runner", runner) if runner.expireTimer != nil { runner.expireTimer.Stop() runner.expireTimer = nil } s.expiredCh <- runner } else if runner.expireTimer == nil { - slog.Debug("runner with non-zero duration has gone idle, adding timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration) + slog.Debug("runner with non-zero duration has gone idle, adding timer", "runner", runner, "duration", runner.sessionDuration) runner.expireTimer = time.AfterFunc(runner.sessionDuration, func() { - slog.Debug("timer expired, expiring to unload", "modelPath", runner.modelPath) + slog.Debug("timer expired, expiring to unload", "runner", runner) runner.refMu.Lock() defer runner.refMu.Unlock() if runner.expireTimer != nil { @@ -351,18 +352,18 @@ func (s *Scheduler) processCompleted(ctx context.Context) { }) runner.expiresAt = time.Now().Add(runner.sessionDuration) } else { - slog.Debug("runner with non-zero duration has gone idle, resetting timer", "modelPath", runner.modelPath, "duration", runner.sessionDuration) + slog.Debug("runner with non-zero duration has gone idle, resetting timer", "runner", runner, "duration", runner.sessionDuration) runner.expireTimer.Reset(runner.sessionDuration) runner.expiresAt = time.Now().Add(runner.sessionDuration) } } - slog.Debug("after processing request finished event", "modelPath", runner.modelPath, "refCount", runner.refCount) + slog.Debug("after processing request finished event", "runner", runner, "refCount", runner.refCount) runner.refMu.Unlock() case runner := <-s.expiredCh: - slog.Debug("runner expired event received", "modelPath", runner.modelPath) + slog.Debug("runner expired event received", "runner", runner) runner.refMu.Lock() if runner.refCount > 0 { - slog.Debug("expired event with positive ref count, retrying", "modelPath", runner.modelPath, "refCount", runner.refCount) + slog.Debug("expired event with positive ref count, retrying", "runner", runner, "refCount", runner.refCount) go func(runner *runnerRef) { // We can't unload yet, but want to as soon as the current request completes // So queue up another expired event @@ -374,16 +375,16 @@ func (s *Scheduler) processCompleted(ctx context.Context) { } s.loadedMu.Lock() - slog.Debug("got lock to unload", "modelPath", runner.modelPath) + slog.Debug("got lock to unload", "runner", runner) finished := runner.waitForVRAMRecovery() runner.unload() delete(s.loaded, runner.modelPath) s.loadedMu.Unlock() - slog.Debug("runner released", "modelPath", runner.modelPath) + slog.Debug("runner released", "runner", runner) runner.refMu.Unlock() <-finished - slog.Debug("sending an unloaded event", "modelPath", runner.modelPath) + slog.Debug("sending an unloaded event", "runner", runner) s.unloadedCh <- struct{}{} } } @@ -406,7 +407,7 @@ func (pending *LlmRequest) useLoadedRunner(runner *runnerRef, finished chan *Llm pending.successCh <- runner go func() { <-pending.ctx.Done() - slog.Debug("context for request finished") + slog.Debug("context for request finished", "runner", runner) finished <- pending }() } @@ -441,6 +442,7 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis estimatedVRAM: llama.EstimatedVRAM(), estimatedTotal: llama.EstimatedTotal(), loading: true, + pid: llama.Pid(), } runner.numParallel = numParallel runner.refMu.Lock() // hold lock until running or aborted @@ -460,6 +462,9 @@ func (s *Scheduler) load(req *LlmRequest, f *ggml.GGML, gpus discover.GpuInfoLis return } slog.Debug("finished setting up runner", "model", req.model.ModelPath) + if runner.pid < 0 { + runner.pid = llama.Pid() + } runner.refCount++ runner.loading = false go func() { @@ -544,6 +549,7 @@ type runnerRef struct { refCount uint // prevent unloading if > 0 llama llm.LlamaServer + pid int loading bool // True only during initial load, then false forever gpus discover.GpuInfoList // Recorded at time of provisioning estimatedVRAM uint64 @@ -668,6 +674,31 @@ func (runner *runnerRef) waitForVRAMRecovery() chan any { return finished } +func (runner *runnerRef) LogValue() slog.Value { + if runner == nil { + return slog.StringValue("nil") + } + attrs := []slog.Attr{} + if runner.model != nil { + attrs = append(attrs, slog.String("name", runner.model.Name)) + } + if len(runner.gpus) > 0 { + attrs = append(attrs, + slog.String("inference", runner.gpus[0].Library), + slog.Int("devices", len(runner.gpus)), + ) + } + attrs = append(attrs, + slog.String("size", format.HumanBytes2(runner.estimatedTotal)), + slog.String("vram", format.HumanBytes2(runner.estimatedVRAM)), + slog.Int("num_ctx", runner.NumCtx), + slog.Int("parallel", runner.numParallel), + slog.Int("pid", runner.pid), + slog.String("model", runner.modelPath), + ) + return slog.GroupValue(attrs...) +} + type ByDurationAndName []*runnerRef func (a ByDurationAndName) Len() int { return len(a) } @@ -790,12 +821,12 @@ func (s *Scheduler) findRunnerToUnload() *runnerRef { rc := runner.refCount runner.refMu.Unlock() if rc == 0 { - slog.Debug("found an idle runner to unload") + slog.Debug("found an idle runner to unload", "runner", runner) return runner } } // None appear idle, just wait for the one with the shortest duration - slog.Debug("no idle runners, picking the shortest duration", "count", len(runnerList)) + slog.Debug("no idle runners, picking the shortest duration", "runner_count", len(runnerList), "runner", runnerList[0]) return runnerList[0] } diff --git a/server/sched_test.go b/server/sched_test.go index 274e18cec..1e8e11372 100644 --- a/server/sched_test.go +++ b/server/sched_test.go @@ -792,3 +792,4 @@ func (s *mockLlm) Close() error { func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM } func (s *mockLlm) EstimatedTotal() uint64 { return s.estimatedTotal } func (s *mockLlm) EstimatedVRAMByGPU(gpuid string) uint64 { return s.estimatedVRAMByGPU[gpuid] } +func (s *mockLlm) Pid() int { return -1 }