From 4e320b8b90b8a698fc3c057a3f54cbabe59b543a Mon Sep 17 00:00:00 2001 From: Blake Mizerany Date: Fri, 14 Mar 2025 08:57:59 -0700 Subject: [PATCH] server/internal/chunks: remove chunks package (#9755) --- server/internal/cache/blob/chunked.go | 13 ++- server/internal/chunks/chunks.go | 81 ------------- server/internal/chunks/chunks_test.go | 65 ----------- server/internal/client/ollama/registry.go | 25 +++- .../internal/client/ollama/registry_test.go | 51 --------- server/internal/cmd/oppbench/oppbench.go | 11 -- server/internal/cmd/oppbench/oppbench_test.go | 107 ------------------ 7 files changed, 32 insertions(+), 321 deletions(-) delete mode 100644 server/internal/chunks/chunks.go delete mode 100644 server/internal/chunks/chunks_test.go delete mode 100644 server/internal/cmd/oppbench/oppbench.go delete mode 100644 server/internal/cmd/oppbench/oppbench_test.go diff --git a/server/internal/cache/blob/chunked.go b/server/internal/cache/blob/chunked.go index 5faea84f6..3f62127a3 100644 --- a/server/internal/cache/blob/chunked.go +++ b/server/internal/cache/blob/chunked.go @@ -5,11 +5,18 @@ import ( "errors" "io" "os" - - "github.com/ollama/ollama/server/internal/chunks" ) -type Chunk = chunks.Chunk // TODO: move chunks here? +// Chunk represents a range of bytes in a blob. +type Chunk struct { + Start int64 + End int64 +} + +// Size returns end minus start plus one. +func (c Chunk) Size() int64 { + return c.End - c.Start + 1 +} // Chunker writes to a blob in chunks. // Its zero value is invalid. Use [DiskCache.Chunked] to create a new Chunker. diff --git a/server/internal/chunks/chunks.go b/server/internal/chunks/chunks.go deleted file mode 100644 index 7bb4e99a5..000000000 --- a/server/internal/chunks/chunks.go +++ /dev/null @@ -1,81 +0,0 @@ -package chunks - -import ( - "fmt" - "iter" - "strconv" - "strings" -) - -type Chunk struct { - Start, End int64 -} - -func New(start, end int64) Chunk { - return Chunk{start, end} -} - -// ParseRange parses a string in the form "unit=range" where unit is a string -// and range is a string in the form "start-end". It returns the unit and the -// range as a Chunk. -func ParseRange(s string) (unit string, _ Chunk, _ error) { - unit, r, _ := strings.Cut(s, "=") - if r == "" { - return unit, Chunk{}, nil - } - c, err := Parse(r) - if err != nil { - return "", Chunk{}, err - } - return unit, c, err -} - -// Parse parses a string in the form "start-end" and returns the Chunk. -func Parse[S ~string | ~[]byte](s S) (Chunk, error) { - startPart, endPart, found := strings.Cut(string(s), "-") - if !found { - return Chunk{}, fmt.Errorf("chunks: invalid range %q: missing '-'", s) - } - start, err := strconv.ParseInt(startPart, 10, 64) - if err != nil { - return Chunk{}, fmt.Errorf("chunks: invalid start to %q: %v", s, err) - } - end, err := strconv.ParseInt(endPart, 10, 64) - if err != nil { - return Chunk{}, fmt.Errorf("chunks: invalid end to %q: %v", s, err) - } - if start > end { - return Chunk{}, fmt.Errorf("chunks: invalid range %q: start > end", s) - } - return Chunk{start, end}, nil -} - -// Of returns a sequence of contiguous Chunks of size chunkSize that cover -// the range [0, size), in order. -func Of(size, chunkSize int64) iter.Seq[Chunk] { - return func(yield func(Chunk) bool) { - for start := int64(0); start < size; start += chunkSize { - end := min(start+chunkSize-1, size-1) - if !yield(Chunk{start, end}) { - break - } - } - } -} - -// Count returns the number of Chunks of size chunkSize needed to cover the -// range [0, size). -func Count(size, chunkSize int64) int64 { - return (size + chunkSize - 1) / chunkSize -} - -// Size returns end minus start plus one. -func (c Chunk) Size() int64 { - return c.End - c.Start + 1 -} - -// String returns the string representation of the Chunk in the form -// "{start}-{end}". -func (c Chunk) String() string { - return fmt.Sprintf("%d-%d", c.Start, c.End) -} diff --git a/server/internal/chunks/chunks_test.go b/server/internal/chunks/chunks_test.go deleted file mode 100644 index c23e0de8e..000000000 --- a/server/internal/chunks/chunks_test.go +++ /dev/null @@ -1,65 +0,0 @@ -package chunks - -import ( - "slices" - "testing" -) - -func TestOf(t *testing.T) { - cases := []struct { - total int64 - chunkSize int64 - want []Chunk - }{ - {0, 1, nil}, - {1, 1, []Chunk{{0, 0}}}, - {1, 2, []Chunk{{0, 0}}}, - {2, 1, []Chunk{{0, 0}, {1, 1}}}, - {10, 9, []Chunk{{0, 8}, {9, 9}}}, - } - - for _, tt := range cases { - got := slices.Collect(Of(tt.total, tt.chunkSize)) - if !slices.Equal(got, tt.want) { - t.Errorf("[%d/%d]: got %v; want %v", tt.total, tt.chunkSize, got, tt.want) - } - } -} - -func TestSize(t *testing.T) { - cases := []struct { - c Chunk - want int64 - }{ - {Chunk{0, 0}, 1}, - {Chunk{0, 1}, 2}, - {Chunk{3, 4}, 2}, - } - - for _, tt := range cases { - got := tt.c.Size() - if got != tt.want { - t.Errorf("%v: got %d; want %d", tt.c, got, tt.want) - } - } -} - -func TestCount(t *testing.T) { - cases := []struct { - total int64 - chunkSize int64 - want int64 - }{ - {0, 1, 0}, - {1, 1, 1}, - {1, 2, 1}, - {2, 1, 2}, - {10, 9, 2}, - } - for _, tt := range cases { - got := Count(tt.total, tt.chunkSize) - if got != tt.want { - t.Errorf("[%d/%d]: got %d; want %d", tt.total, tt.chunkSize, got, tt.want) - } - } -} diff --git a/server/internal/client/ollama/registry.go b/server/internal/client/ollama/registry.go index baf42262b..cf05f79ae 100644 --- a/server/internal/client/ollama/registry.go +++ b/server/internal/client/ollama/registry.go @@ -36,7 +36,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/ollama/ollama/server/internal/cache/blob" - "github.com/ollama/ollama/server/internal/chunks" "github.com/ollama/ollama/server/internal/internal/backoff" "github.com/ollama/ollama/server/internal/internal/names" @@ -500,7 +499,7 @@ func (r *Registry) Pull(ctx context.Context, name string) error { if err != nil { return err } - req.Header.Set("Range", fmt.Sprintf("bytes=%s", cs.Chunk)) + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", cs.Chunk.Start, cs.Chunk.End)) res, err := sendRequest(r.client(), req) if err != nil { return err @@ -794,7 +793,7 @@ func (r *Registry) chunksums(ctx context.Context, name string, l *Layer) iter.Se yield(chunksum{}, err) return } - chunk, err := chunks.Parse(s.Bytes()) + chunk, err := parseChunk(s.Bytes()) if err != nil { yield(chunksum{}, fmt.Errorf("invalid chunk range for digest %s: %q", d, s.Bytes())) return @@ -1059,3 +1058,23 @@ func splitExtended(s string) (scheme, name, digest string) { } return scheme, s, digest } + +// parseChunk parses a string in the form "start-end" and returns the Chunk. +func parseChunk[S ~string | ~[]byte](s S) (blob.Chunk, error) { + startPart, endPart, found := strings.Cut(string(s), "-") + if !found { + return blob.Chunk{}, fmt.Errorf("chunks: invalid range %q: missing '-'", s) + } + start, err := strconv.ParseInt(startPart, 10, 64) + if err != nil { + return blob.Chunk{}, fmt.Errorf("chunks: invalid start to %q: %v", s, err) + } + end, err := strconv.ParseInt(endPart, 10, 64) + if err != nil { + return blob.Chunk{}, fmt.Errorf("chunks: invalid end to %q: %v", s, err) + } + if start > end { + return blob.Chunk{}, fmt.Errorf("chunks: invalid range %q: start > end", s) + } + return blob.Chunk{Start: start, End: end}, nil +} diff --git a/server/internal/client/ollama/registry_test.go b/server/internal/client/ollama/registry_test.go index ecfc63264..30fb58ab7 100644 --- a/server/internal/client/ollama/registry_test.go +++ b/server/internal/client/ollama/registry_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/ollama/ollama/server/internal/cache/blob" - "github.com/ollama/ollama/server/internal/chunks" "github.com/ollama/ollama/server/internal/testutil" ) @@ -531,56 +530,6 @@ func TestRegistryPullMixedCachedNotCached(t *testing.T) { } } -func TestRegistryPullChunking(t *testing.T) { - t.Skip("TODO: BRING BACK BEFORE LANDING") - - rc, _ := newClient(t, func(w http.ResponseWriter, r *http.Request) { - t.Log("request:", r.URL.Host, r.Method, r.URL.Path, r.Header.Get("Range")) - if r.URL.Host != "blob.store" { - // The production registry redirects to the blob store. - http.Redirect(w, r, "http://blob.store"+r.URL.Path, http.StatusFound) - return - } - if strings.Contains(r.URL.Path, "/blobs/") { - rng := r.Header.Get("Range") - if rng == "" { - http.Error(w, "missing range", http.StatusBadRequest) - return - } - _, c, err := chunks.ParseRange(r.Header.Get("Range")) - if err != nil { - panic(err) - } - io.WriteString(w, "remote"[c.Start:c.End+1]) - return - } - fmt.Fprintf(w, `{"layers":[{"digest":%q,"size":6}]}`, blob.DigestFromBytes("remote")) - }) - - // Force chunking by setting the threshold to less than the size of the - // layer. - rc.ChunkingThreshold = 3 - rc.MaxChunkSize = 3 - - var reads []int64 - ctx := WithTrace(t.Context(), &Trace{ - Update: func(d *Layer, n int64, err error) { - if err != nil { - t.Errorf("update %v %d %v", d, n, err) - } - reads = append(reads, n) - }, - }) - - err := rc.Pull(ctx, "remote") - testutil.Check(t, err) - - want := []int64{0, 3, 6} - if !slices.Equal(reads, want) { - t.Errorf("reads = %v; want %v", reads, want) - } -} - func TestRegistryResolveByDigest(t *testing.T) { check := testutil.Checker(t) diff --git a/server/internal/cmd/oppbench/oppbench.go b/server/internal/cmd/oppbench/oppbench.go deleted file mode 100644 index 7a5305947..000000000 --- a/server/internal/cmd/oppbench/oppbench.go +++ /dev/null @@ -1,11 +0,0 @@ -package main - -import ( - "fmt" - "os" -) - -func main() { - fmt.Println("Run as 'go test -bench=.' to run the benchmarks") - os.Exit(1) -} diff --git a/server/internal/cmd/oppbench/oppbench_test.go b/server/internal/cmd/oppbench/oppbench_test.go deleted file mode 100644 index c71d6cded..000000000 --- a/server/internal/cmd/oppbench/oppbench_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package main - -import ( - "bytes" - "context" - "fmt" - "io" - "net/http" - "os" - "path/filepath" - "runtime" - "sync/atomic" - "testing" - "time" - - "github.com/ollama/ollama/server/internal/chunks" - "golang.org/x/sync/errgroup" -) - -func BenchmarkDownload(b *testing.B) { - run := func(fileSize, chunkSize int64) { - name := fmt.Sprintf("size=%d/chunksize=%d", fileSize, chunkSize) - b.Run(name, func(b *testing.B) { benchmarkDownload(b, fileSize, chunkSize) }) - } - - run(100<<20, 8<<20) - run(100<<20, 16<<20) - run(100<<20, 32<<20) - run(100<<20, 64<<20) - run(100<<20, 128<<20) // 1 chunk -} - -func run(ctx context.Context, c *http.Client, chunk chunks.Chunk) error { - const blobURL = "https://ollama.com/v2/x/x/blobs/sha256-4824460d29f2058aaf6e1118a63a7a197a09bed509f0e7d4e2efb1ee273b447d" - req, err := http.NewRequestWithContext(ctx, "GET", blobURL, nil) - if err != nil { - return err - } - req.Header.Set("Range", fmt.Sprintf("bytes=%s", chunk)) - res, err := c.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - - _, err = io.CopyN(io.Discard, res.Body, chunk.Size()) // will io.EOF on short read - return err -} - -var sleepTime atomic.Int64 - -func benchmarkDownload(b *testing.B, fileSize, chunkSize int64) { - client := &http.Client{ - Transport: func() http.RoundTripper { - tr := http.DefaultTransport.(*http.Transport).Clone() - tr.DisableKeepAlives = true - return tr - }(), - } - defer client.CloseIdleConnections() - - // warm up the client - run(context.Background(), client, chunks.New(0, 1<<20)) - - b.SetBytes(fileSize) - b.ReportAllocs() - - // Give our CDN a min to breathe between benchmarks. - time.Sleep(time.Duration(sleepTime.Swap(3))) - - for b.Loop() { - g, ctx := errgroup.WithContext(b.Context()) - g.SetLimit(runtime.GOMAXPROCS(0)) - for chunk := range chunks.Of(fileSize, chunkSize) { - g.Go(func() error { return run(ctx, client, chunk) }) - } - if err := g.Wait(); err != nil { - b.Fatal(err) - } - } -} - -func BenchmarkWrite(b *testing.B) { - b.Run("chunksize=1MiB", func(b *testing.B) { benchmarkWrite(b, 1<<20) }) -} - -func benchmarkWrite(b *testing.B, chunkSize int) { - b.ReportAllocs() - - dir := b.TempDir() - f, err := os.Create(filepath.Join(dir, "write-single")) - if err != nil { - b.Fatal(err) - } - defer f.Close() - - data := make([]byte, chunkSize) - b.SetBytes(int64(chunkSize)) - r := bytes.NewReader(data) - for b.Loop() { - r.Reset(data) - _, err := io.Copy(f, r) - if err != nil { - b.Fatal(err) - } - } -}