forked from microsoft/hdfs-mount
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathFileHandleReader.go
132 lines (119 loc) · 4.66 KB
/
FileHandleReader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
package main
import (
"bazil.org/fuse"
"errors"
"golang.org/x/net/context"
"io"
)
// Encapsulates state and routines for reading data from the file handle
// FileHandleReader implements simple two-buffer scheme which allows to efficiently
// handle unordered reads which aren't far away from each other, so backend stream can
// be read sequentially without seek
type FileHandleReader struct {
Handle *FileHandle // File handle
HdfsReader ReadSeekCloser // Backend reader
Offset int64 // Current offset for backend reader
Buffer1 *FileFragment // Most recent fragment from the backend reader
Buffer2 *FileFragment // Least recent fragment read from the backend
Holes int64 // tracks number of encountered "holes" TODO: find better name
CacheHits int64 // tracks number of cache hits (read requests from buffer)
Seeks int64 // tracks number of seeks performed on the backend stream
}
// Opens the reader (creates backend reader)
func NewFileHandleReader(handle *FileHandle) (*FileHandleReader, error) {
this := &FileHandleReader{Handle: handle}
var err error
this.HdfsReader, err = handle.File.FileSystem.HdfsAccessor.OpenRead(handle.File.AbsolutePath())
if err != nil {
Error.Println("[", handle.File.AbsolutePath(), "] Opening: ", err)
return nil, err
}
this.Buffer1 = &FileFragment{}
this.Buffer2 = &FileFragment{}
return this, nil
}
// Responds on FUSE Read request. Note: If FUSE requested to read N bytes it expects exactly N, unless EOF
func (this *FileHandleReader) Read(handle *FileHandle, ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
totalRead := 0
buf := resp.Data[0:req.Size]
fileOffset := req.Offset
var nr int
var err error
for len(buf) > 0 {
nr, err = this.ReadPartial(handle, fileOffset, buf)
if err != nil {
break
}
totalRead += nr
fileOffset += int64(nr)
buf = buf[nr:]
}
resp.Data = resp.Data[0:totalRead]
if err == io.EOF {
// EOF isn't a error, reporting successful read to FUSE
return nil
} else {
return err
}
}
var BLOCKSIZE int = 65536
// Reads chunk of data (satisfies part of FUSE read request)
func (this *FileHandleReader) ReadPartial(handle *FileHandle, fileOffset int64, buf []byte) (int, error) {
// First checking whether we can satisfy request from buffered file fragments
var nr int
if this.Buffer1.ReadFromBuffer(fileOffset, buf, &nr) || this.Buffer2.ReadFromBuffer(fileOffset, buf, &nr) {
this.CacheHits++
return nr, nil
}
// None of the buffers has the data to satisfy the request, we're going to read more data from backend into Buffer1
// Before doing that, swapping buffers to keep MRU/LRU invariant
this.Buffer2, this.Buffer1 = this.Buffer1, this.Buffer2
maxBytesToRead := len(buf)
minBytesToRead := 1
if fileOffset != this.Offset {
// We're reading not from the offset expected by the backend stream
// we need to decide whether we do Seek(), or read the skipped data (refered as "hole" below)
if fileOffset > this.Offset && fileOffset-this.Offset <= int64(BLOCKSIZE*2) {
holeSize := int(fileOffset - this.Offset)
this.Holes++
maxBytesToRead += holeSize // we're going to read the "hole"
minBytesToRead = holeSize + 1 // we need to read at least one byte starting from requested offset
} else {
this.Seeks++
err := this.HdfsReader.Seek(fileOffset)
// If seek error happens, return err. Seek to the end of the file is not an error.
if err != nil && this.Offset > fileOffset{
Error.Println("[seek", handle.File.AbsolutePath(), " @offset:", this.Offset, "] Seek error to", fileOffset, "(file offset):", err.Error())
return 0, err
}
this.Offset = fileOffset
}
}
// Ceiling to the nearest BLOCKSIZE
maxBytesToRead = (maxBytesToRead + BLOCKSIZE - 1) / BLOCKSIZE * BLOCKSIZE
// Reading from backend into Buffer1
err := this.Buffer1.ReadFromBackend(this.HdfsReader, &this.Offset, minBytesToRead, maxBytesToRead)
if err != nil {
if err == io.EOF {
Warning.Println("[", handle.File.AbsolutePath(), "] EOF @", this.Offset)
return 0, err
}
return 0, err
}
// Now Buffer1 has the data to satisfy request
if !this.Buffer1.ReadFromBuffer(fileOffset, buf, &nr) {
return 0, errors.New("INTERNAL ERROR: FileFragment invariant")
}
return nr, nil
}
// Closes the reader
func (this *FileHandleReader) Close() error {
if this.HdfsReader != nil {
Info.Println("[", this.Handle.File.AbsolutePath(), "] ReadStats: holes:", this.Holes, ", cache hits:", this.CacheHits, ", hard seeks:", this.Seeks)
this.HdfsReader.Close()
this.HdfsReader = nil
}
return nil
}