forked from microsoft/hdfs-mount
-
Notifications
You must be signed in to change notification settings - Fork 0
/
HdfsAccessor.go
301 lines (275 loc) · 9.6 KB
/
HdfsAccessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
package main
import (
"bazil.org/fuse"
"errors"
"fmt"
"github.com/colinmarc/hdfs"
"github.com/colinmarc/hdfs/protocol/hadoop_hdfs"
"io"
"log"
"os"
"os/user"
"strconv"
"strings"
"sync"
"time"
)
// Interface for accessing HDFS
// Concurrency: thread safe: handles unlimited number of concurrent requests
type HdfsAccessor interface {
OpenRead(path string) (ReadSeekCloser, error) // Opens HDFS file for reading
CreateFile(path string, mode os.FileMode) (HdfsWriter, error) // Opens HDFS file for writing
ReadDir(path string) ([]Attrs, error) // Enumerates HDFS directory
Stat(path string) (Attrs, error) // retrieves file/directory attributes
Mkdir(path string, mode os.FileMode) error // Creates a directory
Remove(path string) error // Removes a file or directory
Rename(oldPath string, newPath string) error // Renames a file or directory
EnsureConnected() error // Ensures HDFS accessor is connected to the HDFS name node
}
type hdfsAccessorImpl struct {
Clock Clock // interface to get wall clock time
NameNodeAddresses []string // array of Address:port string for the name nodes
CurrentNameNodeIdx int // Index of the current name node in NameNodeAddresses array
MetadataClient *hdfs.Client // HDFS client used for metadata operations
MetadataClientMutex sync.Mutex // Serializing all metadata operations for simplicity (for now), TODO: allow N concurrent operations
UserNameToUidCache map[string]UidCacheEntry // cache for converting usernames to UIDs
}
type UidCacheEntry struct {
Uid uint32 // User Id
Expires time.Time // Absolute time when this cache entry expires
}
var _ HdfsAccessor = (*hdfsAccessorImpl)(nil) // ensure hdfsAccessorImpl implements HdfsAccessor
// Creates an instance of HdfsAccessor
func NewHdfsAccessor(nameNodeAddresses string, clock Clock) (HdfsAccessor, error) {
nns := strings.Split(nameNodeAddresses, ",")
this := &hdfsAccessorImpl{
NameNodeAddresses: nns,
CurrentNameNodeIdx: 0,
Clock: clock,
UserNameToUidCache: make(map[string]UidCacheEntry)}
return this, nil
}
// Ensures that metadata client is connected
func (this *hdfsAccessorImpl) EnsureConnected() error {
if this.MetadataClient != nil {
return nil
}
return this.ConnectMetadataClient()
}
// Establishes connection to the name node (assigns MetadataClient field)
func (this *hdfsAccessorImpl) ConnectMetadataClient() error {
client, err := this.ConnectToNameNode()
if err != nil {
return err
}
this.MetadataClient = client
return nil
}
// Establishes connection to a name node in the context of some other operation
func (this *hdfsAccessorImpl) ConnectToNameNode() (*hdfs.Client, error) {
// connecting to HDFS name node
nnAddr := this.NameNodeAddresses[this.CurrentNameNodeIdx]
client, err := this.connectToNameNodeImpl(nnAddr)
if err != nil {
// Connection failed, updating CurrentNameNodeIdx to try different name node next time
this.CurrentNameNodeIdx = (this.CurrentNameNodeIdx + 1) % len(this.NameNodeAddresses)
return nil, errors.New(fmt.Sprintf("%s: %s", nnAddr, err.Error()))
}
log.Printf("Connected to name node %s", nnAddr)
return client, nil
}
// Performs an attempt to connect to the HDFS name
func (this *hdfsAccessorImpl) connectToNameNodeImpl(nnAddr string) (*hdfs.Client, error) {
// Performing an attempt to connect to the name node
client, err := hdfs.New(nnAddr)
if err != nil {
return nil, err
}
// connection is OK, but we need to check whether name node is operating ans expected
// (this also checks whether name node is Active)
// Performing this check, by doing Stat() for a path inside root directory
// Note: The file '/$' doesn't have to be present
// - both nil and ErrNotExists error codes indicate success of the operation
_, statErr := client.Stat("/$")
if pathError, ok := statErr.(*os.PathError); statErr == nil || ok && (pathError.Err == os.ErrNotExist) {
// Succesfully connected
return client, nil
} else {
//TODO: how to close connection ?
return nil, statErr
}
}
// Opens HDFS file for reading
func (this *hdfsAccessorImpl) OpenRead(path string) (ReadSeekCloser, error) {
client, err1 := this.ConnectToNameNode()
if err1 != nil {
return nil, err1
}
reader, err2 := client.Open(path)
if err2 != nil {
//TODO: close connection
return nil, err2
}
return NewHdfsReader(reader), nil
}
// Creates new HDFS file
func (this *hdfsAccessorImpl) CreateFile(path string, mode os.FileMode) (HdfsWriter, error) {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return nil, err
}
}
writer, err := this.MetadataClient.CreateFile(path, 3, 64*1024*1024, mode)
if err != nil {
return nil, err
}
return NewHdfsWriter(writer), nil
}
// Enumerates HDFS directory
func (this *hdfsAccessorImpl) ReadDir(path string) ([]Attrs, error) {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return nil, err
}
}
files, err := this.MetadataClient.ReadDir(path)
if err != nil {
if IsSuccessOrBenignError(err) {
// benign error (e.g. path not found)
return nil, err
}
// We've got error from this client, setting to nil, so we try another one next time
this.MetadataClient = nil
// TODO: attempt to gracefully close the conenction
return nil, err
}
allAttrs := make([]Attrs, len(files))
for i, fileInfo := range files {
allAttrs[i] = this.AttrsFromFileInfo(fileInfo)
}
return allAttrs, nil
}
// retrieves file/directory attributes
func (this *hdfsAccessorImpl) Stat(path string) (Attrs, error) {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return Attrs{}, err
}
}
fileInfo, err := this.MetadataClient.Stat(path)
if err != nil {
if IsSuccessOrBenignError(err) {
// benign error (e.g. path not found)
return Attrs{}, err
}
// We've got error from this client, setting to nil, so we try another one next time
this.MetadataClient = nil
// TODO: attempt to gracefully close the conenction
return Attrs{}, err
}
return this.AttrsFromFileInfo(fileInfo), nil
}
// Converts os.FileInfo + underlying proto-buf data into Attrs structure
func (this *hdfsAccessorImpl) AttrsFromFileInfo(fileInfo os.FileInfo) Attrs {
protoBufData := fileInfo.Sys().(*hadoop_hdfs.HdfsFileStatusProto)
mode := os.FileMode(*protoBufData.Permission.Perm)
if fileInfo.IsDir() {
mode |= os.ModeDir
}
modificationTime := time.Unix(int64(protoBufData.GetModificationTime())/1000, 0)
return Attrs{
Inode: *protoBufData.FileId,
Name: fileInfo.Name(),
Mode: mode,
Size: *protoBufData.Length,
Uid: this.LookupUid(*protoBufData.Owner),
Mtime: modificationTime,
Ctime: modificationTime,
Crtime: modificationTime,
Gid: 0} // TODO: Group is now hardcoded to be "root", implement proper mapping
}
func HadoopTimestampToTime(timestamp uint64) time.Time {
return time.Unix(int64(timestamp)/1000, 0)
}
// Performs a cache-assisted lookup of UID by username
func (this *hdfsAccessorImpl) LookupUid(userName string) uint32 {
if userName == "" {
return 0
}
// Note: this method is called under MetadataClientMutex, so accessing the cache dirctionary is safe
cacheEntry, ok := this.UserNameToUidCache[userName]
if ok && this.Clock.Now().Before(cacheEntry.Expires) {
return cacheEntry.Uid
}
u, err := user.Lookup(userName)
var uid64 uint64
if err == nil {
// UID is returned as string, need to parse it
uid64, err = strconv.ParseUint(u.Uid, 10, 32)
}
if err != nil {
uid64 = (1 << 31) - 1
}
this.UserNameToUidCache[userName] = UidCacheEntry{
Uid: uint32(uid64),
Expires: this.Clock.Now().Add(5 * time.Minute)} // caching UID for 5 minutes
return uint32(uid64)
}
// Returns true if err==nil or err is expected (benign) error which should be propagated directoy to the caller
func IsSuccessOrBenignError(err error) bool {
if err == nil || err == io.EOF || err == fuse.EEXIST {
return true
}
if pathError, ok := err.(*os.PathError); ok && (pathError.Err == os.ErrNotExist) {
return true
} else {
return false
}
}
// Creates a directory
func (this *hdfsAccessorImpl) Mkdir(path string, mode os.FileMode) error {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return err
}
}
err := this.MetadataClient.Mkdir(path, mode)
if err != nil {
if strings.HasSuffix(err.Error(), "file already exists") {
err = fuse.EEXIST
}
}
return err
}
// Removes file or directory
func (this *hdfsAccessorImpl) Remove(path string) error {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return err
}
}
return this.MetadataClient.Remove(path)
}
// Renames file or directory
func (this *hdfsAccessorImpl) Rename(oldPath string, newPath string) error {
this.MetadataClientMutex.Lock()
defer this.MetadataClientMutex.Unlock()
if this.MetadataClient == nil {
if err := this.ConnectMetadataClient(); err != nil {
return err
}
}
return this.MetadataClient.Rename(oldPath, newPath)
}