Skip to content

Commit 2e362ef

Browse files
committed
perf(local): shared stat scheduler replacing per-entry timeout machinery
Replace the per-entry withTimeout/withTimeoutRetry goroutine+channel machinery and producer-consumer channel pipeline with a shared per-Fs stat scheduler using microbatch lease-gated publication. Key changes: - Sequential ReadDir(1024) → ProcessBatch → materialize loop - Batch-scoped raw dirfd via Fd(), openat(readfd, ".") for stat fds - StatScheduler with watchdog, retry queues, replacement workers - Removed SyscallConn.Control per-entry overhead, errReopen, old pipeline, withTimeout, statPool, cloneDirForListStat - Scheduler optimizations: pooled jobs, scratch reuse, in-place compaction, worker ID recycling, pre-sized result slices Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
1 parent 4a7a2ea commit 2e362ef

File tree

11 files changed

+1814
-1376
lines changed

11 files changed

+1814
-1376
lines changed

backend/local/list.go

Lines changed: 73 additions & 620 deletions
Large diffs are not rendered by default.

backend/local/list_fstatat_linux.go

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
//go:build linux && amd64
22

3-
// This file implements the linux/amd64 fast-stat path: cloneDirForListStat
4-
// dup's the directory fd via F_DUPFD_CLOEXEC, and statDirEntry calls
5-
// fstatat(dirfd, name, AT_SYMLINK_NOFOLLOW) to stat each entry without
6-
// constructing an absolute path. On other platforms the build tag excludes
7-
// this file and list_fstatat_other.go provides stubs that fall back to
8-
// entry.Info().
3+
// This file implements the linux/amd64 fast-stat path used by the sequential
4+
// listing loop. openDirAtReadFD opens "." relative to the listing read fd to
5+
// create a batch-scoped stat handle, listStatDirFD snapshots that handle's raw
6+
// dirfd once per batch, and statDirEntry uses fstatat(dirfd, name,
7+
// AT_SYMLINK_NOFOLLOW) without constructing absolute paths. Other platforms use
8+
// list_fstatat_other.go and fall back to entry.Info().
99

1010
package local
1111

@@ -113,40 +113,48 @@ func fstatatNoFollow(dirfd int, name string) (os.FileInfo, error) {
113113
}, nil
114114
}
115115

116-
func cloneDirForListStat(fd *os.File) (*os.File, error) {
117-
sc, err := fd.SyscallConn()
118-
if err != nil {
119-
return nil, err
116+
// openDirAtReadFD opens "." relative to the active listing fd so one ReadDir
117+
// batch gets its own directory handle for fstatat work.
118+
func openDirAtReadFD(fd *os.File) (*os.File, error) {
119+
if fd == nil {
120+
return nil, os.ErrClosed
120121
}
121-
var dupFD int
122-
var dupErr error
123-
ctrlErr := sc.Control(func(rawfd uintptr) {
124-
dupFD, dupErr = unix.FcntlInt(rawfd, unix.F_DUPFD_CLOEXEC, 0)
125-
})
126-
if ctrlErr != nil {
127-
return nil, ctrlErr
122+
123+
rawfd := fd.Fd()
124+
if rawfd == ^uintptr(0) {
125+
return nil, os.ErrClosed
128126
}
129-
if dupErr != nil {
130-
return nil, dupErr
127+
128+
for {
129+
statFD, err := unix.Openat(int(rawfd), ".", unix.O_RDONLY|unix.O_DIRECTORY|unix.O_CLOEXEC, 0)
130+
if err == syscall.EINTR {
131+
continue
132+
}
133+
if err != nil {
134+
return nil, err
135+
}
136+
return os.NewFile(uintptr(statFD), fd.Name()), nil
131137
}
132-
return os.NewFile(uintptr(dupFD), fd.Name()), nil
133138
}
134139

135-
func statDirEntry(entry cachedDirEntry) (os.FileInfo, error) {
136-
if entry.statDir == nil {
137-
return entry.Info()
140+
// listStatDirFD snapshots the raw dirfd from the batch-owned stat handle so the
141+
// batch can reuse it for every entry without reacquiring it per stat call.
142+
func listStatDirFD(fd *os.File) (int, bool) {
143+
if fd == nil {
144+
return 0, false
138145
}
139-
sc, err := entry.statDir.SyscallConn()
140-
if err != nil {
141-
return nil, err
146+
rawfd := fd.Fd()
147+
if rawfd == ^uintptr(0) {
148+
return 0, false
142149
}
143-
var fi os.FileInfo
144-
var fiErr error
145-
ctrlErr := sc.Control(func(rawfd uintptr) {
146-
fi, fiErr = fstatatNoFollow(int(rawfd), entry.Name())
147-
})
148-
if ctrlErr != nil {
149-
return nil, ctrlErr
150+
return int(rawfd), true
151+
}
152+
153+
// statDirEntry stats one cached entry through the batch-scoped dirfd when
154+
// available, otherwise it falls back to the DirEntry.Info path.
155+
func statDirEntry(entry *cachedDirEntry) (os.FileInfo, error) {
156+
if entry.useStatFD {
157+
return fstatatNoFollow(entry.statDirFD, entry.Name())
150158
}
151-
return fi, fiErr
159+
return entry.Info()
152160
}

backend/local/list_fstatat_linux_test.go

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func TestFstatatNoFollow_ENOENT(t *testing.T) {
135135
}()
136136

137137
fi, err := fstatatNoFollow(int(fd.Fd()), "missing.txt")
138-
require.Nil(t, fi)
138+
assert.Nil(t, fi)
139139
require.True(t, os.IsNotExist(err), "expected not-exist error, got %v", err)
140140
}
141141

@@ -172,7 +172,30 @@ func TestList_FstatatPath_TranslateSymlinksFilterParity(t *testing.T) {
172172
assert.Equal(t, []string{"alias.txt" + fs.LinkSuffix}, dirEntryRemotes(entries))
173173
}
174174

175-
func TestStatDirEntry_ClosedStatDirReturnsErrClosed(t *testing.T) {
175+
func TestListStatDirFD_ClosedFileReturnsFalse(t *testing.T) {
176+
root := t.TempDir()
177+
fd, err := os.Open(root)
178+
require.NoError(t, err)
179+
180+
require.NoError(t, fd.Close())
181+
182+
rawfd, ok := listStatDirFD(fd)
183+
assert.Zero(t, rawfd)
184+
assert.False(t, ok)
185+
}
186+
187+
func TestOpenDirAtReadFD_ClosedFileReturnsErrClosed(t *testing.T) {
188+
root := t.TempDir()
189+
fd, err := os.Open(root)
190+
require.NoError(t, err)
191+
require.NoError(t, fd.Close())
192+
193+
statDir, err := openDirAtReadFD(fd)
194+
require.Nil(t, statDir)
195+
require.True(t, errors.Is(err, os.ErrClosed), "expected os.ErrClosed, got %v", err)
196+
}
197+
198+
func TestStatDirEntry_UsesBatchScopedDirFD(t *testing.T) {
176199
root := t.TempDir()
177200
require.NoError(t, os.WriteFile(filepath.Join(root, "file.txt"), []byte("hello"), 0o600))
178201

@@ -189,36 +212,28 @@ func TestStatDirEntry_ClosedStatDirReturnsErrClosed(t *testing.T) {
189212
}
190213
require.NotNil(t, entry)
191214

192-
fd, err := os.Open(root)
215+
statDir, err := os.Open(root)
193216
require.NoError(t, err)
194217
defer func() {
195-
require.NoError(t, fd.Close())
218+
require.NoError(t, statDir.Close())
196219
}()
197220

198-
statDir, err := cloneDirForListStat(fd)
199-
require.NoError(t, err)
200-
require.NotNil(t, statDir)
221+
statDirFD, ok := listStatDirFD(statDir)
222+
require.True(t, ok)
201223

202-
closedFD := int(statDir.Fd())
203-
require.NoError(t, statDir.Close())
204-
205-
var reused *os.File
206-
for i := 0; i < 32; i++ {
207-
probe, err := os.Open(root)
208-
require.NoError(t, err)
209-
if int(probe.Fd()) == closedFD {
210-
reused = probe
211-
break
212-
}
213-
require.NoError(t, probe.Close())
214-
}
215-
if reused != nil {
216-
defer func() {
217-
require.NoError(t, reused.Close())
218-
}()
224+
cachedEntry := cachedDirEntry{
225+
DirEntry: entry,
226+
statDirFD: statDirFD,
227+
useStatFD: true,
219228
}
229+
fi, err := statDirEntry(&cachedEntry)
230+
require.NoError(t, err)
220231

221-
fi, err := statDirEntry(cachedDirEntry{DirEntry: entry, statDir: statDir})
222-
require.Nil(t, fi)
223-
require.True(t, errors.Is(err, os.ErrClosed), "expected os.ErrClosed, got %v", err)
232+
want, err := os.Lstat(filepath.Join(root, "file.txt"))
233+
require.NoError(t, err)
234+
235+
assert.Equal(t, want.Name(), fi.Name())
236+
assert.Equal(t, want.Size(), fi.Size())
237+
assert.Equal(t, want.Mode(), fi.Mode())
238+
assert.True(t, fi.ModTime().Equal(want.ModTime()), "expected modtime %v, got %v", want.ModTime(), fi.ModTime())
224239
}
Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,27 @@
11
//go:build !(linux && amd64)
22

3-
// This file is the fallback for all platforms other than linux/amd64.
4-
// It provides no-op stubs for cloneDirForListStat and statDirEntry so that
5-
// list.go compiles everywhere; statDirEntry simply delegates to entry.Info()
6-
// which uses the DirEntry cached from ReadDir without an extra syscall.
3+
// This file is the fallback for all platforms other than linux/amd64. The
4+
// sequential listing loop still reads one ReadDir batch at a time, but there is
5+
// no batch-scoped fstatat helper on these targets, so statDirEntry delegates to
6+
// entry.Info().
77

88
package local
99

1010
import "os"
1111

12-
func cloneDirForListStat(fd *os.File) (*os.File, error) {
12+
// openDirAtReadFD is a no-op stub: this platform has no batch-scoped dirfd fast
13+
// path, so it returns a nil handle. The caller detects nil and falls back to
14+
// entry.Info().
15+
func openDirAtReadFD(fd *os.File) (*os.File, error) {
1316
return nil, nil
1417
}
1518

16-
func statDirEntry(entry cachedDirEntry) (os.FileInfo, error) {
19+
// listStatDirFD reports that there is no raw dirfd to reuse on this platform.
20+
func listStatDirFD(fd *os.File) (int, bool) {
21+
return 0, false
22+
}
23+
24+
// statDirEntry falls back to the cached DirEntry.Info implementation.
25+
func statDirEntry(entry *cachedDirEntry) (os.FileInfo, error) {
1726
return entry.Info()
1827
}

backend/local/list_pipeline_cleanup_test.go

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,57 +11,60 @@ import (
1111
"github.com/stretchr/testify/require"
1212
)
1313

14-
func TestListFileInfos_PipelineCancellationBufferedBatch(t *testing.T) {
14+
func TestListFileInfos_CancellationLargeDirectory(t *testing.T) {
1515
f, root := newTestLocalFs(t)
1616
for i := 0; i < 2050; i++ {
1717
writeTestFile(t, root, fmt.Sprintf("item-%04d.txt", i))
1818
}
1919

20-
baseCtx := newAsyncTestContext(4)
21-
ctx, cancel := context.WithCancel(baseCtx)
22-
defer cancel()
20+
ctx, cancel := context.WithCancel(context.Background())
21+
f.statScheduler.Close()
22+
f.statScheduler = newStatScheduler(f, statSchedulerOptions{
23+
Workers: 1,
24+
MaxWorkers: 1,
25+
QueueDepth: 16,
26+
LeaseTimeout: time.Second,
27+
RetryBackoff: time.Millisecond,
28+
WatchdogInterval: time.Second,
29+
WarnAfter: time.Second,
30+
ReplaceAfter: time.Second,
31+
})
32+
t.Cleanup(f.statScheduler.Close)
2333

2434
fd, err := os.Open(root)
2535
require.NoError(t, err)
2636

27-
openDir := func() (*os.File, error) {
28-
return os.Open(root)
29-
}
30-
3137
started := make(chan struct{}, 1)
32-
statFunc := func(entry os.DirEntry) os.FileInfo {
33-
select {
34-
case started <- struct{}{}:
35-
default:
36-
}
37-
<-ctx.Done()
38-
return nil
39-
}
40-
4138
done := make(chan error, 1)
4239
go func() {
43-
_, err := f.listFileInfos(ctx, fd, openDir, nil, statFunc, nil)
40+
_, err := f.listFileInfos(ctx, fd, nil, nil, func(entry os.DirEntry) os.FileInfo {
41+
select {
42+
case started <- struct{}{}:
43+
default:
44+
}
45+
<-ctx.Done()
46+
return nil
47+
})
4448
done <- err
4549
}()
4650

4751
select {
4852
case <-started:
4953
case <-time.After(2 * time.Second):
50-
t.Fatal("timed out waiting for pipeline stat work to start")
54+
t.Fatal("timed out waiting for scheduled stat work to start")
5155
}
5256

53-
time.Sleep(50 * time.Millisecond)
5457
cancel()
5558

5659
select {
5760
case err := <-done:
5861
require.True(t, errors.Is(err, context.Canceled), "expected context cancellation, got %v", err)
5962
case <-time.After(2 * time.Second):
60-
t.Fatal("timed out waiting for buffered pipeline cancellation")
63+
t.Fatal("timed out waiting for large-directory cancellation")
6164
}
6265
}
6366

64-
func TestStatReadBatch_ClosesStatDirOnPanic(t *testing.T) {
67+
func TestListController_ProcessBatch_ClosesStatDirOnPanic(t *testing.T) {
6568
dir := t.TempDir()
6669
statDir, err := os.Open(dir)
6770
require.NoError(t, err)
@@ -70,22 +73,24 @@ func TestStatReadBatch_ClosesStatDirOnPanic(t *testing.T) {
7073
entries: fakeEntries("panic.txt"),
7174
statDir: statDir,
7275
}
76+
controller := newListController(nil, nil, listControllerOptions{})
77+
defer controller.Close()
7378

7479
var recovered any
7580
func() {
7681
defer func() {
7782
recovered = recover()
7883
}()
79-
_, _, _ = (&Fs{}).statReadBatch(context.Background(), &batch, nil, func(os.DirEntry) (cachedDirEntry, bool) {
84+
_, _, _ = controller.ProcessBatch(context.Background(), &batch, nil, func(os.DirEntry) (cachedDirEntry, bool) {
8085
panic("boom")
81-
}, nil, nil)
86+
}, func(entry *cachedDirEntry) (os.FileInfo, error) {
87+
t.Fatalf("unexpected stat call for %q", entry.Name())
88+
return nil, nil
89+
})
8290
}()
8391

8492
require.NotNil(t, recovered)
8593
require.Nil(t, batch.statDir)
86-
87-
require.Eventually(t, func() bool {
88-
_, err := statDir.Stat()
89-
return errors.Is(err, os.ErrClosed)
90-
}, time.Second, 10*time.Millisecond, "expected statDir to be closed after panic cleanup")
94+
_, err = statDir.Stat()
95+
require.True(t, errors.Is(err, os.ErrClosed), "expected statDir to be closed after panic cleanup, got %v", err)
9196
}

0 commit comments

Comments
 (0)