Go构建文件系统监控

在很多场景下我们需要用到文件系统监控,比如构建配置的热加载,词典的触发备份等,之前我写过一个基于轮训实现的文件系统监控 kwatcher,现在再看的话发现有很多地方不足

  • 效率低。对于不经常变动的文件,每隔几秒的检查是多余的
  • 时效性低。时延最大达轮训间隔时间。

fsnotify

查看业界常用的Go文件监控库fsnotify源码,以上几点都有解决,特此记录笔记。

1
fsnotify是Go的一款文件系统监控通知库,通过它可以方便的获取文件变更的消息通知

fsnotify在不同OS上的底层实现不同,在Linux上基于inotify实现。fsnotify在linux上实现要点

  • 基于事件驱动实现,效率远高于轮训且无时延
1
2
3
4
5
在 Linux 内核 2.6.13 (June 18, 2005)版本之后,Linux 内核新增了一批文件系统的扩展接口(API),其中之一就是inotify,inotify 提供了一种基于 inode 的监控文件系统事件的机制,可以监控文件系统的变化如文件修改、新增、删除等,并可以将相应的事件通知给应用程序。
inotify 既可以监控文件,也可以监控目录。当监控目录时,它可以同时监控目录本身以及及目录中的各文件的变化。此外,inotify 使用文件描述符作为接口,因而可以使用通常的文件I/O操作 select、poll 和 epoll 来监视文件系统的变化。
总之,简单来说就是:inotify 为我们从系统层面提供了一种可以监控文件变化的接口,我们可以利用它来监控文件或目录的变化

fsnotify工作流程

核心源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//监控器
type Watcher struct {
mu sync.Mutex // Map access
fd int // File descriptor (as returned by the inotify_init() syscall)
watches map[string]*watch // Map of inotify watches (key: path)
fsnFlags map[string]uint32 // Map of watched files to flags used for filter
fsnmut sync.Mutex // Protects access to fsnFlags.
paths map[int]string // Map of watched paths (key: watch descriptor)
Error chan error // Errors are sent on this channel
internalEvent chan *FileEvent // Events are queued on this channel
Event chan *FileEvent // Events are returned on this channel
done chan bool // Channel for sending a "quit message" to the reader goroutine
isClosed bool // Set to true when Close() is first called
}
// NewWatcher creates and returns a new inotify instance using inotify_init(2)
func NewWatcher() (*Watcher, error) {
//创建inotify系统标识符
fd, errno := syscall.InotifyInit()
if fd == -1 {
return nil, os.NewSyscallError("inotify_init", errno)
}
w := &Watcher{
fd: fd,
watches: make(map[string]*watch),
fsnFlags: make(map[string]uint32),
paths: make(map[int]string),
internalEvent: make(chan *FileEvent),
Event: make(chan *FileEvent),
Error: make(chan error),
done: make(chan bool, 1),
}
//读取监控
go w.readEvents()
//过滤变更事件,最终用户通过w.Event获取用户关心的事件
go w.purgeEvents()
return w, nil
}
// The flags are interpreted as described in inotify_add_watch(2).
func (w *Watcher) addWatch(path string, flags uint32) error {
if w.isClosed {
return errors.New("inotify instance already closed")
}
w.mu.Lock()
watchEntry, found := w.watches[path]
w.mu.Unlock()
if found {
watchEntry.flags |= flags
flags |= syscall.IN_MASK_ADD
}
//向inotify系统添加监控目标
wd, errno := syscall.InotifyAddWatch(w.fd, path, flags)
if wd == -1 {
return errno
}
w.mu.Lock()
w.watches[path] = &watch{wd: uint32(wd), flags: flags}
w.paths[wd] = path
w.mu.Unlock()
return nil
}
// readEvents reads from the inotify file descriptor, converts the
// received events into Event objects and sends them via the Event channel
func (w *Watcher) readEvents() {
var (
buf [syscall.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events
n int // Number of bytes read with read()
errno error // Syscall errno
)
for {
n, errno = syscall.Read(w.fd, buf[:])
...
var offset uint32 = 0
// We don't know how many events we just read into the buffer
// While the offset points to at least one whole event...
for offset <= uint32(n-syscall.SizeofInotifyEvent) {
// Point "raw" to the event in the buffer
raw := (*syscall.InotifyEvent)(unsafe.Pointer(&buf[offset]))
event := new(FileEvent)
event.mask = uint32(raw.Mask) //mask描述事件类型
event.cookie = uint32(raw.Cookie) //特定事件使用(IN_MOVED_FROM and IN_MOVED_TO)
nameLen := uint32(raw.Len) //文件名长度(监控目录时用)
// If the event happened to the watched directory or the watched file, the kernel
// doesn't append the filename to the event, but we would like to always fill the
// the "Name" field with a valid filename. We retrieve the path of the watch from
// the "paths" map.
w.mu.Lock()
event.Name = w.paths[int(raw.Wd)] //raw.Wd 监控器的标识符
w.mu.Unlock()
watchedName := event.Name
//当监控的是目录,目录中的文件变更时nameLen>0,且文件名是strings.TrimRight(string(bytes[0:nameLen]), "\000")
if nameLen > 0 {
// Point "bytes" at the first byte of the filename
bytes := (*[syscall.PathMax]byte)(unsafe.Pointer(&buf[offset+syscall.SizeofInotifyEvent]))
// The filename is padded with NUL bytes. TrimRight() gets rid of those.
event.Name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000")
}
w.internalEvent <- event
// Move to the next event in the buffer
offset += syscall.SizeofInotifyEvent + nameLen
}
}
}
// Purge events from interal chan to external chan if passes filter
func (w *Watcher) purgeEvents() {
for ev := range w.internalEvent {
sendEvent := false
w.fsnmut.Lock()
fsnFlags := w.fsnFlags[ev.Name]
w.fsnmut.Unlock()
if (fsnFlags&FSN_CREATE == FSN_CREATE) && ev.IsCreate() {
sendEvent = true
}
if (fsnFlags&FSN_MODIFY == FSN_MODIFY) && ev.IsModify() {
sendEvent = true
}
if (fsnFlags&FSN_DELETE == FSN_DELETE) && ev.IsDelete() {
sendEvent = true
}
if (fsnFlags&FSN_RENAME == FSN_RENAME) && ev.IsRename() {
sendEvent = true
}
if sendEvent {
w.Event <- ev
}
// If there's no file, then no more events for user
// BSD must keep watch for internal use (watches DELETEs to keep track
// what files exist for create events)
if ev.IsDelete() {
w.fsnmut.Lock()
delete(w.fsnFlags, ev.Name)
w.fsnmut.Unlock()
}
}
close(w.Event)
}

消费示例

1
2
3
4
5
6
select {
case ev := <-watcher.Event:
t.Fatalf("We received event: %v\n", ev)
case <-time.After(500 * time.Millisecond):
t.Log("No event received, as expected.")
}

相关资料

基于inotify实现配置文件热更新

fsnotify源码

inotify介绍