Skip to content

Commit db185b1

Browse files
committed
perf(local): lease pooling and ReadDir pipelining for listing pipeline
Split stat_scheduler_core.go into core/lease/batch modules and replace per-microbatch time.AfterFunc timers (688K goroutines/run) with a single watchdog sweep over sync.Pool'd activeLease structs. Active lease set is bounded by MaxWorkers (16), not microbatch count, eliminating ~3.44M heap allocations per run. Add prefetchReader that overlaps ReadDir(N+1) with stat processing of batch N for multi-batch directories (>1024 entries). First batch is always inlined — no goroutine overhead for the 90%+ of directories that are single-batch. Uses cap-1 channel for handoff, sync.Once for fd ownership, and a 5s close timeout to prevent indefinite hangs on stuck mounts. Hardening: sweepLeases timeoutBuf accesses moved inside mu for race safety, prefetchReader panic recovery now logs via fs.Errorf, and includeDirFn errors logged at Infof instead of silently discarded. Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
1 parent 34887d8 commit db185b1

File tree

8 files changed

+1030
-732
lines changed

8 files changed

+1030
-732
lines changed

backend/local/list.go

Lines changed: 60 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (f *Fs) runTimedOperation(ctx context.Context, timer *time.Timer, operation
9494
}
9595
}
9696

97-
// retryTimedOperation reruns a timed operation when the timeout expires.
97+
// retryTimedOperation reruns operation indefinitely on DeadlineExceeded, backing off exponentially from 1s to 15 min, returning only on context cancellation or a non-timeout error.
9898
func (f *Fs) retryTimedOperation(ctx context.Context, name string, timer *time.Timer, operation func() error) error {
9999
const maxBackoff = 15 * time.Minute
100100
backoff := 1 * time.Second
@@ -202,11 +202,14 @@ func (f *Fs) listFileInfos(ctx context.Context, fd *os.File, openDir func() (*os
202202
return allFis, nil
203203
}
204204

205-
// listCachedFileInfos reads one directory sequentially. Each iteration reads one
206-
// ReadDir(1024) batch, filters it, stats the filtered entries through the
207-
// shared per-Fs scheduler, materializes the results, then advances to the next
208-
// ReadDir batch.
205+
// listCachedFileInfos reads one directory. The first ReadDir(1024) batch stays
206+
// inline so single-batch directories avoid prefetch overhead; if that batch
207+
// returns exactly readDirBatchSize entries (indicating more may follow), later
208+
// batches are read through prefetchReader so ReadDir for batch N+1 overlaps
209+
// stat work for batch N.
209210
func (f *Fs) listCachedFileInfos(ctx context.Context, fd *os.File, openDir func() (*os.File, error), preFilter func(os.DirEntry) (cachedDirEntry, bool), statFunc func(entry *cachedDirEntry) (os.FileInfo, error)) (allFis []statFileInfo, err error) {
211+
const readDirBatchSize = 1024
212+
210213
defer func() {
211214
if r := recover(); r != nil {
212215
allFis = nil
@@ -286,37 +289,64 @@ func (f *Fs) listCachedFileInfos(ctx context.Context, fd *os.File, openDir func(
286289
}
287290

288291
controller := newListController(f, f.statScheduler, listControllerOptions{})
289-
defer controller.Close()
292+
fdOwned := true
290293
defer func() {
291-
if fd != nil {
294+
if fdOwned && fd != nil {
292295
_ = fd.Close()
293296
}
294297
}()
295298

299+
if err := ctx.Err(); err != nil {
300+
return nil, err
301+
}
302+
296303
var entries []cachedDirEntry
297-
for {
298-
if err := ctx.Err(); err != nil {
299-
return nil, err
304+
batch := readResult{}
305+
batch.entries, batch.err = fd.ReadDir(readDirBatchSize)
306+
if len(batch.entries) > 0 {
307+
// Give this ReadDir batch its own stat handle tied to the same opened
308+
// directory so fstatat work can reuse one batch-scoped dirfd.
309+
statDir, openErr := openDirAtReadFD(fd)
310+
if openErr != nil {
311+
fs.Debugf(f, "openDirAtReadFD: %v, falling back to entry.Info()", openErr)
312+
} else {
313+
batch.statDir = statDir
300314
}
315+
}
301316

302-
batch := readResult{}
303-
batch.entries, batch.err = fd.ReadDir(1024)
304-
if len(batch.entries) > 0 {
305-
// Give this ReadDir batch its own stat handle tied to the same opened
306-
// directory so fstatat work can reuse one batch-scoped dirfd.
307-
statDir, openErr := openDirAtReadFD(fd)
308-
if openErr != nil {
309-
fs.Debugf(f, "openDirAtReadFD: %v, falling back to entry.Info()", openErr)
310-
} else {
311-
batch.statDir = statDir
317+
var fis []statFileInfo
318+
entries, fis, err = controller.ProcessBatch(ctx, &batch, entries, preFilter, statFunc)
319+
if err != nil {
320+
return nil, err
321+
}
322+
allFis = append(allFis, fis...)
323+
324+
if batch.err == io.EOF {
325+
return allFis, nil
326+
}
327+
if batch.err != nil {
328+
return nil, batch.err
329+
}
330+
if len(batch.entries) != readDirBatchSize {
331+
return allFis, nil
332+
}
333+
334+
reader := newPrefetchReader(ctx, f, fd)
335+
fdOwned = false
336+
defer reader.Close()
337+
338+
for {
339+
batch, ok := reader.Next(ctx)
340+
if !ok {
341+
if err := ctx.Err(); err != nil {
342+
return nil, err
312343
}
344+
return allFis, nil
313345
}
314346

315-
var fis []statFileInfo
316-
var statErr error
317-
entries, fis, statErr = controller.ProcessBatch(ctx, &batch, entries, preFilter, statFunc)
318-
if statErr != nil {
319-
return nil, statErr
347+
entries, fis, err = controller.ProcessBatch(ctx, &batch, entries, preFilter, statFunc)
348+
if err != nil {
349+
return nil, err
320350
}
321351
allFis = append(allFis, fis...)
322352

@@ -402,7 +432,10 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
402432
statFunc := func(entry *cachedDirEntry) (os.FileInfo, error) {
403433
entryType := entry.Type()
404434
if includeDirFn != nil && entryType.IsDir() {
405-
include, _ := includeDirFn(entry.newRemote)
435+
include, inclErr := includeDirFn(entry.newRemote)
436+
if inclErr != nil {
437+
fs.Infof(entry.newRemote, "directory exclusion check failed: %v", inclErr)
438+
}
406439
if !include {
407440
return nil, nil
408441
}
@@ -493,6 +526,6 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
493526
}
494527
}
495528
// Uncomment to signal pre-filtered entries:
496-
// ctx = filter.SetPreFiltered(ctx)
529+
// FIXME: enable SetPreFiltered once preFilter in List correctly excludes all entries that IncludeRemote would reject
497530
return entries, nil
498531
}

backend/local/list_pipeline_cleanup_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ func TestListController_ProcessBatch_ClosesStatDirOnPanic(t *testing.T) {
7474
statDir: statDir,
7575
}
7676
controller := newListController(nil, nil, listControllerOptions{})
77-
defer controller.Close()
7877

7978
var recovered any
8079
func() {

backend/local/list_prefetch.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package local
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"sync"
8+
"time"
9+
10+
"github.com/rclone/rclone/fs"
11+
)
12+
13+
// prefetchReader overlaps ReadDir I/O for batch N+1 with foreground stat work
14+
// on batch N. It takes sole ownership of the read fd until Close returns.
15+
type prefetchReader struct {
16+
ch chan readResult
17+
done chan struct{}
18+
cancel context.CancelFunc
19+
20+
closeFD sync.Once
21+
fd *os.File
22+
}
23+
24+
// newPrefetchReader starts the background ReadDir loop and transfers sole
25+
// ownership of fd to the returned reader.
26+
func newPrefetchReader(ctx context.Context, owner *Fs, fd *os.File) *prefetchReader {
27+
runCtx, cancel := context.WithCancel(ctx)
28+
pr := &prefetchReader{
29+
ch: make(chan readResult, 1),
30+
done: make(chan struct{}),
31+
cancel: cancel,
32+
fd: fd,
33+
}
34+
go pr.run(runCtx, owner, fd)
35+
return pr
36+
}
37+
38+
// run reads directory batches, opens an optional batch-scoped stat fd, and
39+
// hands completed readResult values to the foreground through a 1-deep buffer.
40+
func (pr *prefetchReader) run(ctx context.Context, owner *Fs, fd *os.File) {
41+
var pending readResult
42+
43+
defer close(pr.done)
44+
defer close(pr.ch)
45+
defer pr.closeFD.Do(func() {
46+
if pr.fd != nil {
47+
_ = pr.fd.Close()
48+
}
49+
})
50+
defer func() {
51+
if r := recover(); r != nil {
52+
closeReadResultStatDir(&pending)
53+
54+
var panicErr error
55+
if e, ok := r.(error); ok {
56+
panicErr = fmt.Errorf("panic reading directory: %w", e)
57+
} else {
58+
panicErr = fmt.Errorf("panic reading directory: %v", r)
59+
}
60+
61+
fs.Errorf(owner, "prefetchReader: %v", panicErr)
62+
select {
63+
case pr.ch <- readResult{err: panicErr}:
64+
case <-ctx.Done():
65+
}
66+
}
67+
}()
68+
69+
for {
70+
select {
71+
case <-ctx.Done():
72+
return
73+
default:
74+
}
75+
76+
batch := readResult{}
77+
batch.entries, batch.err = fd.ReadDir(1024)
78+
pending = batch
79+
80+
if len(batch.entries) > 0 {
81+
statDir, err := openDirAtReadFD(fd)
82+
if err != nil {
83+
fs.Debugf(owner, "openDirAtReadFD: %v, falling back to entry.Info()", err)
84+
} else {
85+
batch.statDir = statDir
86+
pending.statDir = statDir
87+
}
88+
}
89+
90+
if ctx.Err() != nil {
91+
closeReadResultStatDir(&batch)
92+
return
93+
}
94+
95+
select {
96+
case pr.ch <- batch:
97+
pending = readResult{}
98+
case <-ctx.Done():
99+
closeReadResultStatDir(&batch)
100+
return
101+
}
102+
103+
if batch.err != nil {
104+
return
105+
}
106+
}
107+
}
108+
109+
// Next blocks until the next prefetched batch is available or the caller
110+
// context is canceled.
111+
func (pr *prefetchReader) Next(ctx context.Context) (readResult, bool) {
112+
select {
113+
case batch, ok := <-pr.ch:
114+
return batch, ok
115+
case <-ctx.Done():
116+
return readResult{}, false
117+
}
118+
}
119+
120+
// Close cancels the background reader, closes the read fd to interrupt any
121+
// blocked ReadDir, drains buffered batches, and waits for shutdown.
122+
func (pr *prefetchReader) Close() {
123+
pr.cancel()
124+
pr.closeFD.Do(func() {
125+
if pr.fd != nil {
126+
_ = pr.fd.Close()
127+
}
128+
})
129+
130+
timer := time.NewTimer(5 * time.Second)
131+
defer timer.Stop()
132+
133+
for {
134+
select {
135+
case batch, ok := <-pr.ch:
136+
if !ok {
137+
select {
138+
case <-pr.done:
139+
case <-timer.C:
140+
fs.Errorf(nil, "prefetchReader: background goroutine did not exit after close")
141+
}
142+
return
143+
}
144+
closeReadResultStatDir(&batch)
145+
case <-pr.done:
146+
for {
147+
select {
148+
case batch, ok := <-pr.ch:
149+
if !ok {
150+
return
151+
}
152+
closeReadResultStatDir(&batch)
153+
default:
154+
return
155+
}
156+
}
157+
case <-timer.C:
158+
fs.Errorf(nil, "prefetchReader: background goroutine did not exit after close")
159+
return
160+
}
161+
}
162+
}

0 commit comments

Comments
 (0)