forked from microsoft/hdfs-mount
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathHdfsAccessor.go
372 lines (338 loc) · 11.6 KB
/
HdfsAccessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
package main
import (
"bazil.org/fuse"
"errors"
"fmt"
"github.com/colinmarc/hdfs"
"github.com/colinmarc/hdfs/protocol/hadoop_hdfs"
"io"
"os"
"os/user"
"strconv"
"strings"
"sync"
"time"
)
// Interface for accessing HDFS
// Concurrency: thread safe: handles unlimited number of concurrent requests
type HdfsAccessor interface {
OpenRead(path string) (ReadSeekCloser, error) // Opens HDFS file for reading
CreateFile(path string, mode os.FileMode) (HdfsWriter, error) // Opens HDFS file for writing
ReadDir(path string) ([]Attrs, error) // Enumerates HDFS directory
Stat(path string) (Attrs, error) // Retrieves file/directory attributes
StatFs() (FsInfo, error) // Retrieves HDFS usage
Mkdir(path string, mode os.FileMode) error // Creates a directory
Remove(path string) error // Removes a file or directory
Rename(oldPath string, newPath string) error // Renames a file or directory
EnsureConnected() error // Ensures HDFS accessor is connected to the HDFS name node
Chown(path string, owner, group string) error // Changes the owner and group of the file
Chmod(path string, mode os.FileMode) error // Changes the mode of the file
Close() error // Close current meta connection if needed
}
type hdfsAccessorImpl struct {
Clock Clock // interface to get wall clock time
NameNodeAddresses []string // array of Address:port string for the name nodes
MetadataClient *hdfs.Client // HDFS client used for metadata operations
MetadataClientMutex sync.Mutex // Serializing all metadata operations for simplicity (for now), TODO: allow N concurrent operations
UserNameToUidCache map[string]UidCacheEntry // cache for converting usernames to UIDs
}
type UidCacheEntry struct {
Uid uint32 // User Id
Expires time.Time // Absolute time when this cache entry expires
}
var _ HdfsAccessor = (*hdfsAccessorImpl)(nil) // ensure hdfsAccessorImpl implements HdfsAccessor
// Creates an instance of HdfsAccessor
func NewHdfsAccessor(nameNodeAddresses string, clock Clock) (HdfsAccessor, error) {
nns := strings.Split(nameNodeAddresses, ",")
this := &hdfsAccessorImpl{
NameNodeAddresses: nns,
Clock: clock,
UserNameToUidCache: make(map[string]UidCacheEntry)}
return this, nil
}
// Ensures that metadata client is connected
func (this *hdfsAccessorImpl) EnsureConnected() error {
if this.MetadataClient != nil {
return nil
}
return this.ConnectMetadataClient()
}
// Establishes connection to the name node (assigns MetadataClient field)
func (this *hdfsAccessorImpl) ConnectMetadataClient() error {
client, err := this.ConnectToNameNode()
if err != nil {
return err
}
this.MetadataClient = client
return nil
}
// Establishes connection to a name node in the context of some other operation
func (this *hdfsAccessorImpl) ConnectToNameNode() (*hdfs.Client, error) {
// connecting to HDFS name node
client, err := this.connectToNameNodeImpl()
if err != nil {
// Connection failed
return nil, errors.New(fmt.Sprintf("Fail to connect to name node with error: %s", err.Error()))
}
Info.Println("Connected to name node")
return client, nil
}
// Performs an attempt to connect to the HDFS name
func (this *hdfsAccessorImpl) connectToNameNodeImpl() (*hdfs.Client, error) {
// Performing an attempt to connect to the name node
// Colinmar's hdfs implementation has supported the multiple name node connection
client, err := hdfs.NewClient(hdfs.ClientOptions{
Addresses: this.NameNodeAddresses,
})
if err != nil {
return nil, err
}
// connection is OK, but we need to check whether name node is operating ans expected
// (this also checks whether name node is Active)
// Performing this check, by doing Stat() for a path inside root directory
// Note: The file '/$' doesn't have to be present
// - both nil and ErrNotExists error codes indicate success of the operation
_, statErr := client.Stat("/$")
if pathError, ok := statErr.(*os.PathError); statErr == nil || ok && (pathError.Err == os.ErrNotExist) {
// Succesfully connected
return client, nil
} else {
client.Close()
return nil, statErr
}
}
// Opens HDFS file for reading
func (this *hdfsAccessorImpl) OpenRead(path string) (ReadSeekCloser, error) {
// Blocking read. This is to reduce the connections pressue on hadoop-name-node
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return nil, err
}
}
reader, err := this.MetadataClient.Open(path)
if err != nil {
return nil, err
}
return NewHdfsReader(reader), nil
}
// Creates new HDFS file
func (this *hdfsAccessorImpl) CreateFile(path string, mode os.FileMode) (HdfsWriter, error) {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return nil, err
}
}
writer, err := this.MetadataClient.CreateFile(path, 3, 64*1024*1024, mode)
if err != nil {
return nil, err
}
return NewHdfsWriter(writer), nil
}
// Enumerates HDFS directory
func (this *hdfsAccessorImpl) ReadDir(path string) ([]Attrs, error) {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return nil, err
}
}
files, err := this.MetadataClient.ReadDir(path)
if err != nil {
if IsSuccessOrBenignError(err) {
// benign error (e.g. path not found)
return nil, err
}
// We've got error from this client, setting to nil, so we try another one next time
this.MetadataClient = nil
// TODO: attempt to gracefully close the conenction
return nil, err
}
allAttrs := make([]Attrs, len(files))
for i, fileInfo := range files {
allAttrs[i] = this.AttrsFromFileInfo(fileInfo)
}
return allAttrs, nil
}
// Retrieves file/directory attributes
func (this *hdfsAccessorImpl) Stat(path string) (Attrs, error) {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return Attrs{}, err
}
}
fileInfo, err := this.MetadataClient.Stat(path)
if err != nil {
if IsSuccessOrBenignError(err) {
// benign error (e.g. path not found)
return Attrs{}, err
}
// We've got error from this client, setting to nil, so we try another one next time
this.MetadataClient = nil
// TODO: attempt to gracefully close the conenction
return Attrs{}, err
}
return this.AttrsFromFileInfo(fileInfo), nil
}
// Retrieves HDFS usages
func (this *hdfsAccessorImpl) StatFs() (FsInfo, error) {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return FsInfo{}, err
}
}
fsInfo, err := this.MetadataClient.StatFs()
if err != nil {
if IsSuccessOrBenignError(err) {
return FsInfo{}, err
}
this.MetadataClient = nil
return FsInfo{}, err
}
return this.AttrsFromFsInfo(fsInfo), nil
}
// Converts os.FileInfo + underlying proto-buf data into Attrs structure
func (this *hdfsAccessorImpl) AttrsFromFileInfo(fileInfo os.FileInfo) Attrs {
protoBufData := fileInfo.Sys().(*hadoop_hdfs.HdfsFileStatusProto)
mode := os.FileMode(*protoBufData.Permission.Perm)
if fileInfo.IsDir() {
mode |= os.ModeDir
}
modificationTime := time.Unix(int64(protoBufData.GetModificationTime())/1000, 0)
return Attrs{
Inode: *protoBufData.FileId,
Name: fileInfo.Name(),
Mode: mode,
Size: *protoBufData.Length,
Uid: this.LookupUid(*protoBufData.Owner),
Mtime: modificationTime,
Ctime: modificationTime,
Crtime: modificationTime,
Gid: 0} // TODO: Group is now hardcoded to be "root", implement proper mapping
}
func (this *hdfsAccessorImpl) AttrsFromFsInfo(fsInfo hdfs.FsInfo) FsInfo {
return FsInfo {
capacity: fsInfo.Capacity,
used: fsInfo.Used,
remaining: fsInfo.Remaining}
}
func HadoopTimestampToTime(timestamp uint64) time.Time {
return time.Unix(int64(timestamp)/1000, 0)
}
// Performs a cache-assisted lookup of UID by username
func (this *hdfsAccessorImpl) LookupUid(userName string) uint32 {
if userName == "" {
return 0
}
// Note: this method is called under MetadataClientMutex, so accessing the cache dirctionary is safe
cacheEntry, ok := this.UserNameToUidCache[userName]
if ok && this.Clock.Now().Before(cacheEntry.Expires) {
return cacheEntry.Uid
}
u, err := user.Lookup(userName)
var uid64 uint64
if err == nil {
// UID is returned as string, need to parse it
uid64, err = strconv.ParseUint(u.Uid, 10, 32)
}
if err != nil {
uid64 = (1 << 31) - 1
}
this.UserNameToUidCache[userName] = UidCacheEntry{
Uid: uint32(uid64),
Expires: this.Clock.Now().Add(5 * time.Minute)} // caching UID for 5 minutes
return uint32(uid64)
}
// Returns true if err==nil or err is expected (benign) error which should be propagated directoy to the caller
func IsSuccessOrBenignError(err error) bool {
if err == nil || err == io.EOF || err == fuse.EEXIST {
return true
}
if pathError, ok := err.(*os.PathError); ok && (pathError.Err == os.ErrNotExist || pathError.Err == os.ErrPermission) {
return true
} else {
return false
}
}
// Creates a directory
func (this *hdfsAccessorImpl) Mkdir(path string, mode os.FileMode) error {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return err
}
}
err := this.MetadataClient.Mkdir(path, mode)
if err != nil {
if strings.HasSuffix(err.Error(), "file already exists") {
err = fuse.EEXIST
}
}
return err
}
// Removes file or directory
func (this *hdfsAccessorImpl) Remove(path string) error {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return err
}
}
return this.MetadataClient.Remove(path)
}
// Renames file or directory
func (this *hdfsAccessorImpl) Rename(oldPath string, newPath string) error {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return err
}
}
return this.MetadataClient.Rename(oldPath, newPath)
}
// Changes the mode of the file
func (this *hdfsAccessorImpl) Chmod(path string, mode os.FileMode) error {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return err
}
}
return this.MetadataClient.Chmod(path, mode)
}
// Changes the owner and group of the file
func (this *hdfsAccessorImpl) Chown(path string, user, group string) error {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return err
}
}
return this.MetadataClient.Chown(path, user, group)
}
// Close current connection if needed
func (this *hdfsAccessorImpl) Close() error {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if(this.MetadataClient != nil) {
err := this.MetadataClient.Close()
this.MetadataClient = nil
return err
}
return nil
}