Skip to content

Commit bbdbce7

Browse files
committed
feat(local): sequential fstatat(dirfd) for linux/amd64 listing
Replace per-entry path-based os.Lstat with fstatat(dirfd, name, AT_SYMLINK_NOFOLLOW) on linux/amd64. The directory fd is duplicated once per ReadDir batch via F_DUPFD_CLOEXEC through SyscallConn.Control, carried with the batch to the consumer, and closed asynchronously after stat completes. Key design points: - unix.Stat_t → syscall.Stat_t field copy preserves readDevice/readTime contract without unsafe.Pointer - SyscallConn.Control for both fd duplication and stat calls prevents Fd()-induced deadline damage and fd-number reuse races - Combined cancel-before-drain defer prevents pipeline deadlock on early consumer exit - Nil-guard fallback to entry.Info() on clone failure or non-linux/amd64 - All existing behavioral contracts preserved: SkipRecent, ExcludeFile, TranslateSymlinks, timer patterns, errReopen reset Reviewed across 3 rounds (24 independent review agents). One post-review fix: added explicit S_IFREG case to prevent regular files hitting the ModeIrregular default branch. Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
1 parent a23c53a commit bbdbce7

File tree

5 files changed

+565
-27
lines changed

5 files changed

+565
-27
lines changed

backend/local/list.go

Lines changed: 80 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const (
3131
type cachedDirEntry struct {
3232
os.DirEntry
3333
newRemote string
34+
statDir *os.File
3435
}
3536

3637
type statFileInfo struct {
@@ -85,7 +86,7 @@ type statPool struct {
8586
}
8687

8788
// errReopen is an internal sentinel that is never returned to callers.
88-
// It flows through batchCh between the producer and consumer in listFileInfos
89+
// It flows through batchCh between the producer and consumer in listCachedFileInfos
8990
// when the producer reopens the directory fd after a CephFS timeout.
9091
// The consumer must reset accumulated results because the new fd starts at
9192
// offset 0 and re-reads all entries.
@@ -94,6 +95,57 @@ var errReopen = errors.New("directory reopened after timeout")
9495
type readResult struct {
9596
entries []os.DirEntry
9697
err error
98+
statDir *os.File
99+
}
100+
101+
// closeReadResultStatDir releases the dup'd directory fd held by a read batch.
102+
// The close is async to avoid deadlocking the consumer: when called from the
103+
// deferred cleanup path, the SyscallConn.Control callback that owns the fd ref
104+
// may still be on the stack, so a synchronous close could block. The Close
105+
// error is discarded because the fd is a read-only dup used only for fstatat;
106+
// it holds no write state and has no dirty data to flush.
107+
func closeReadResultStatDir(batch *readResult) {
108+
if batch.statDir != nil {
109+
//nolint:errcheck // async close is intentional for current-batch cleanup.
110+
go batch.statDir.Close()
111+
batch.statDir = nil
112+
}
113+
}
114+
115+
func (f *Fs) statReadBatch(ctx context.Context, batch *readResult, entriesBuf []cachedDirEntry, preFilter func(os.DirEntry) (cachedDirEntry, bool), statFunc func(entry cachedDirEntry) os.FileInfo, pool **statPool) ([]cachedDirEntry, []statFileInfo, error) {
116+
defer closeReadResultStatDir(batch)
117+
118+
if len(batch.entries) == 0 {
119+
return entriesBuf[:0], nil, nil
120+
}
121+
122+
entries := entriesBuf[:0]
123+
if cap(entries) < len(batch.entries) {
124+
entries = make([]cachedDirEntry, 0, len(batch.entries))
125+
}
126+
if preFilter != nil {
127+
for _, entry := range batch.entries {
128+
filteredEntry, ok := preFilter(entry)
129+
if !ok {
130+
continue
131+
}
132+
filteredEntry.statDir = batch.statDir
133+
entries = append(entries, filteredEntry)
134+
}
135+
} else {
136+
for _, entry := range batch.entries {
137+
entries = append(entries, cachedDirEntry{DirEntry: entry, statDir: batch.statDir})
138+
}
139+
}
140+
if len(entries) == 0 {
141+
return entries, nil, nil
142+
}
143+
144+
fis, err := f.asyncStatCachedEntries(ctx, entries, statFunc, pool)
145+
if err != nil {
146+
return nil, nil, err
147+
}
148+
return entries, fis, nil
97149
}
98150

99151
func newStatPool(ctx context.Context, workers int, statFunc func(os.DirEntry) os.FileInfo) *statPool {
@@ -635,7 +687,16 @@ func (f *Fs) listCachedFileInfos(ctx context.Context, fd *os.File, openDir func(
635687
}
636688

637689
pipeCtx, pipeCancel := context.WithCancel(ctx)
638-
defer pipeCancel()
690+
var batchCh chan readResult
691+
defer func() {
692+
pipeCancel()
693+
if batchCh == nil {
694+
return
695+
}
696+
for remaining := range batchCh {
697+
closeReadResultStatDir(&remaining)
698+
}
699+
}()
639700

640701
if useReadDir {
641702
// Windows/Plan9: Readdir returns os.FileInfo directly, no separate stat needed.
@@ -697,7 +758,7 @@ func (f *Fs) listCachedFileInfos(ctx context.Context, fd *os.File, openDir func(
697758
}
698759

699760
// Pipeline: producer goroutine reads batches, consumer stats them.
700-
batchCh := make(chan readResult, 1)
761+
batchCh = make(chan readResult, 1)
701762

702763
go func() {
703764
defer close(batchCh)
@@ -744,9 +805,17 @@ func (f *Fs) listCachedFileInfos(ctx context.Context, fd *os.File, openDir func(
744805
return
745806
}
746807
} else if !errors.Is(result.err, os.ErrDeadlineExceeded) {
808+
if len(result.entries) > 0 {
809+
var cloneErr error
810+
result.statDir, cloneErr = cloneDirForListStat(readFd)
811+
if cloneErr != nil {
812+
fs.Debugf(f, "cloneDirForListStat: %v, falling back to entry.Info()", cloneErr)
813+
}
814+
}
747815
select {
748816
case batchCh <- result:
749817
case <-pipeCtx.Done():
818+
closeReadResultStatDir(&result)
750819
return
751820
}
752821
if result.err != nil {
@@ -781,33 +850,17 @@ func (f *Fs) listCachedFileInfos(ctx context.Context, fd *os.File, openDir func(
781850
var entries []cachedDirEntry
782851
for batch := range batchCh {
783852
if batch.err == errReopen {
853+
closeReadResultStatDir(&batch)
784854
allFis = allFis[:0]
785855
continue
786856
}
787-
if len(batch.entries) > 0 {
788-
entries = entries[:0]
789-
if cap(entries) < len(batch.entries) {
790-
entries = make([]cachedDirEntry, 0, len(batch.entries))
791-
}
792-
if preFilter != nil {
793-
for _, entry := range batch.entries {
794-
filteredEntry, ok := preFilter(entry)
795-
if !ok {
796-
continue
797-
}
798-
entries = append(entries, filteredEntry)
799-
}
800-
} else {
801-
for _, entry := range batch.entries {
802-
entries = append(entries, cachedDirEntry{DirEntry: entry})
803-
}
804-
}
805-
fis, statErr := f.asyncStatCachedEntries(ctx, entries, statFunc, pool)
806-
if statErr != nil {
807-
return nil, statErr
808-
}
809-
allFis = append(allFis, fis...)
857+
var fis []statFileInfo
858+
var statErr error
859+
entries, fis, statErr = f.statReadBatch(ctx, &batch, entries, preFilter, statFunc, pool)
860+
if statErr != nil {
861+
return nil, statErr
810862
}
863+
allFis = append(allFis, fis...)
811864
if batch.err == io.EOF {
812865
break
813866
}
@@ -895,7 +948,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
895948
return nil
896949
}
897950
}
898-
fi, fierr := entry.Info()
951+
fi, fierr := statDirEntry(entry)
899952
if os.IsNotExist(fierr) {
900953
return nil
901954
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
//go:build linux && amd64
2+
3+
// This file implements the linux/amd64 fast-stat path: cloneDirForListStat
4+
// dup's the directory fd via F_DUPFD_CLOEXEC, and statDirEntry calls
5+
// fstatat(dirfd, name, AT_SYMLINK_NOFOLLOW) to stat each entry without
6+
// constructing an absolute path. On other platforms the build tag excludes
7+
// this file and list_fstatat_other.go provides stubs that fall back to
8+
// entry.Info().
9+
10+
package local
11+
12+
import (
13+
"os"
14+
"syscall"
15+
"time"
16+
17+
"golang.org/x/sys/unix"
18+
)
19+
20+
type fstatatFileInfo struct {
21+
name string
22+
mode os.FileMode
23+
size int64
24+
modTime time.Time
25+
stat syscall.Stat_t
26+
}
27+
28+
func (fi *fstatatFileInfo) Name() string { return fi.name }
29+
func (fi *fstatatFileInfo) Size() int64 { return fi.size }
30+
func (fi *fstatatFileInfo) Mode() os.FileMode { return fi.mode }
31+
func (fi *fstatatFileInfo) ModTime() time.Time { return fi.modTime }
32+
func (fi *fstatatFileInfo) IsDir() bool { return fi.mode.IsDir() }
33+
func (fi *fstatatFileInfo) Sys() any { return &fi.stat }
34+
35+
func fileModeFromStat(m uint32) os.FileMode {
36+
mode := os.FileMode(m & 0o777)
37+
switch m & syscall.S_IFMT {
38+
case syscall.S_IFDIR:
39+
mode |= os.ModeDir
40+
case syscall.S_IFLNK:
41+
mode |= os.ModeSymlink
42+
case syscall.S_IFIFO:
43+
mode |= os.ModeNamedPipe
44+
case syscall.S_IFSOCK:
45+
mode |= os.ModeSocket
46+
case syscall.S_IFBLK:
47+
mode |= os.ModeDevice
48+
case syscall.S_IFCHR:
49+
mode |= os.ModeDevice | os.ModeCharDevice
50+
case syscall.S_IFREG:
51+
// regular file, no extra bits
52+
default:
53+
mode |= os.ModeIrregular
54+
}
55+
if m&syscall.S_ISUID != 0 {
56+
mode |= os.ModeSetuid
57+
}
58+
if m&syscall.S_ISGID != 0 {
59+
mode |= os.ModeSetgid
60+
}
61+
if m&syscall.S_ISVTX != 0 {
62+
mode |= os.ModeSticky
63+
}
64+
return mode
65+
}
66+
67+
func syscallTimespecFromUnix(t unix.Timespec) syscall.Timespec {
68+
return syscall.Timespec{
69+
Sec: t.Sec,
70+
Nsec: t.Nsec,
71+
}
72+
}
73+
74+
func syscallStatFromUnix(st unix.Stat_t) syscall.Stat_t {
75+
return syscall.Stat_t{
76+
Dev: st.Dev,
77+
Ino: st.Ino,
78+
Nlink: st.Nlink,
79+
Mode: st.Mode,
80+
Uid: st.Uid,
81+
Gid: st.Gid,
82+
Rdev: st.Rdev,
83+
Size: st.Size,
84+
Blksize: st.Blksize,
85+
Blocks: st.Blocks,
86+
Atim: syscallTimespecFromUnix(st.Atim),
87+
Mtim: syscallTimespecFromUnix(st.Mtim),
88+
Ctim: syscallTimespecFromUnix(st.Ctim),
89+
}
90+
}
91+
92+
func fstatatNoFollow(dirfd int, name string) (os.FileInfo, error) {
93+
var st unix.Stat_t
94+
for {
95+
err := unix.Fstatat(dirfd, name, &st, unix.AT_SYMLINK_NOFOLLOW)
96+
if err == syscall.EINTR {
97+
continue
98+
}
99+
if err != nil {
100+
return nil, err
101+
}
102+
break
103+
}
104+
// int64 casts are no-ops on amd64 but required for other architectures if the build tag is widened.
105+
//nolint:unconvert
106+
modTime := time.Unix(int64(st.Mtim.Sec), int64(st.Mtim.Nsec))
107+
return &fstatatFileInfo{
108+
name: name,
109+
mode: fileModeFromStat(st.Mode),
110+
size: st.Size,
111+
modTime: modTime,
112+
stat: syscallStatFromUnix(st),
113+
}, nil
114+
}
115+
116+
func cloneDirForListStat(fd *os.File) (*os.File, error) {
117+
sc, err := fd.SyscallConn()
118+
if err != nil {
119+
return nil, err
120+
}
121+
var dupFD int
122+
var dupErr error
123+
ctrlErr := sc.Control(func(rawfd uintptr) {
124+
dupFD, dupErr = unix.FcntlInt(rawfd, unix.F_DUPFD_CLOEXEC, 0)
125+
})
126+
if ctrlErr != nil {
127+
return nil, ctrlErr
128+
}
129+
if dupErr != nil {
130+
return nil, dupErr
131+
}
132+
return os.NewFile(uintptr(dupFD), fd.Name()), nil
133+
}
134+
135+
func statDirEntry(entry cachedDirEntry) (os.FileInfo, error) {
136+
if entry.statDir == nil {
137+
return entry.Info()
138+
}
139+
sc, err := entry.statDir.SyscallConn()
140+
if err != nil {
141+
return nil, err
142+
}
143+
var fi os.FileInfo
144+
var fiErr error
145+
ctrlErr := sc.Control(func(rawfd uintptr) {
146+
fi, fiErr = fstatatNoFollow(int(rawfd), entry.Name())
147+
})
148+
if ctrlErr != nil {
149+
return nil, ctrlErr
150+
}
151+
return fi, fiErr
152+
}

0 commit comments

Comments
 (0)