forked from microsoft/hdfs-mount
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathFile.go
179 lines (156 loc) · 4.97 KB
/
File.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
package main
import (
"bazil.org/fuse"
"bazil.org/fuse/fs"
"fmt"
"golang.org/x/net/context"
"os/user"
"path"
"sync"
"time"
)
type File struct {
FileSystem *FileSystem // pointer to the FieSystem which owns this file
Attrs Attrs // Cache of file attributes // TODO: implement TTL
Parent *Dir // Pointer to the parent directory (allows computing fully-qualified paths on demand)
activeHandles []*FileHandle // list of opened file handles
activeHandlesMutex sync.Mutex // mutex for activeHandles
}
// Verify that *File implements necesary FUSE interfaces
var _ fs.Node = (*File)(nil)
var _ fs.NodeOpener = (*File)(nil)
var _ fs.NodeFsyncer = (*File)(nil)
// File is also a factory for ReadSeekCloser objects
var _ ReadSeekCloserFactory = (*File)(nil)
// Retunds absolute path of the file in HDFS namespace
func (this *File) AbsolutePath() string {
return path.Join(this.Parent.AbsolutePath(), this.Attrs.Name)
}
// Responds to the FUSE file attribute request
func (this *File) Attr(ctx context.Context, a *fuse.Attr) error {
if this.FileSystem.Clock.Now().After(this.Attrs.Expires) {
err := this.Parent.LookupAttrs(this.Attrs.Name, &this.Attrs)
if err != nil {
return err
}
}
return this.Attrs.Attr(a)
}
// Responds to the FUSE file open request (creates new file handle)
func (this *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
Info.Println("Open: ", this.AbsolutePath(), req.Flags)
handle := NewFileHandle(this)
if req.Flags.IsReadOnly() || req.Flags.IsReadWrite() {
err := handle.EnableRead()
if err != nil {
return nil, err
}
}
if req.Flags.IsWriteOnly() {
// Enabling write only if opened in WriteOnly mode
// In Read+Write scenario, write wills be enabled in lazy manner (on first write)
newFile := req.Flags.IsWriteOnly() && (req.Flags&fuse.OpenAppend != fuse.OpenAppend)
err := handle.EnableWrite(newFile)
if err != nil {
return nil, err
}
}
this.AddHandle(handle)
return handle, nil
}
// Opens file for reading
func (this *File) OpenRead() (ReadSeekCloser, error) {
handle, err := this.Open(nil, &fuse.OpenRequest{Flags: fuse.OpenReadOnly}, nil)
if err != nil {
return nil, err
}
return NewFileHandleAsReadSeekCloser(handle.(*FileHandle)), nil
}
// Registers an opened file handle
func (this *File) AddHandle(handle *FileHandle) {
this.activeHandlesMutex.Lock()
defer this.activeHandlesMutex.Unlock()
this.activeHandles = append(this.activeHandles, handle)
}
// Unregisters an opened file handle
func (this *File) RemoveHandle(handle *FileHandle) {
this.activeHandlesMutex.Lock()
defer this.activeHandlesMutex.Unlock()
for i, h := range this.activeHandles {
if h == handle {
this.activeHandles = append(this.activeHandles[:i], this.activeHandles[i+1:]...)
break
}
}
}
// Returns a snapshot of opened file handles
func (this *File) GetActiveHandles() []*FileHandle {
this.activeHandlesMutex.Lock()
defer this.activeHandlesMutex.Unlock()
snapshot := make([]*FileHandle, len(this.activeHandles))
copy(snapshot, this.activeHandles)
return snapshot
}
// Responds to the FUSE Fsync request
func (this *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
Info.Println("Dispatching fsync request to open handles: ", len(this.GetActiveHandles()))
var retErr error
for _, handle := range this.GetActiveHandles() {
err := handle.Fsync(ctx, req)
if err != nil {
retErr = err
}
}
return retErr
}
// Invalidates metadata cache, so next ls or stat gives up-to-date file attributes
func (this *File) InvalidateMetadataCache() {
this.Attrs.Expires = this.FileSystem.Clock.Now().Add(-1 * time.Second)
}
// Responds on FUSE Chmod request
func (this *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
// Get the filepath, so chmod in hdfs can work
path := this.AbsolutePath()
var err error
if req.Valid.Mode() {
Info.Println("Chmod [", path, "] to [", req.Mode, "]")
(func() {
err = this.FileSystem.HdfsAccessor.Chmod(path, req.Mode)
if err != nil {
return
}
})()
if err != nil {
Error.Println("Chmod failed with error: ", err)
} else {
this.Attrs.Mode = req.Mode
}
}
if req.Valid.Uid() {
u, err := user.LookupId(fmt.Sprint(req.Uid))
owner := fmt.Sprint(req.Uid)
group := fmt.Sprint(req.Gid)
if err != nil {
Error.Println("Chown: username for uid", req.Uid, "not found, use uid/gid instead")
} else {
owner = u.Username
group = owner // hardcoded the group same as owner
}
Info.Println("Chown [", path, "] to [", owner, ":", group, "]")
(func() {
err = this.FileSystem.HdfsAccessor.Chown(path, fmt.Sprint(req.Uid), fmt.Sprint(req.Gid))
if err != nil {
return
}
})()
if err != nil {
Error.Println("Chown failed with error:", err)
} else {
this.Attrs.Uid = req.Uid
this.Attrs.Gid = req.Gid
}
}
return err
}