forked from traviscrawford/python-hdfs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhfile.py
executable file
·146 lines (122 loc) · 4.15 KB
/
hfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from hdfs._common import *
class Hfile(object):
def __init__(self, hostname, port, filename, mode='r', buffer_size=0,
replication=0, block_size=0):
flags = None
if mode == 'r':
flags = os.O_RDONLY
elif mode == 'w':
flags = os.O_WRONLY
else:
raise HdfsError('Invalid open flags.')
self.hostname = hostname
self.port = port
self.filename = filename
self.fs = libhdfs.hdfsConnect(hostname, port)
self.fh = libhdfs.hdfsOpenFile(self.fs, filename, flags, buffer_size,
replication, block_size)
self.readline_pos = 0
def __iter__(self):
return self
def close(self):
libhdfs.hdfsCloseFile(self.fs, self.fh)
libhdfs.hdfsDisconnect(self.fs)
def next(self):
line = self.readline()
if not line:
raise StopIteration
return line
def open(self, filename, mode='r', buffer_size=0,
replication=0, block_size=0):
"""Open a hdfs file in given mode.
@param fs The configured filesystem handle.
@param path The full path to the file.
@param flags - an | of bits/fcntl.h file flags - supported flags are
O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies
O_TRUNCAT), O_WRONLY|O_APPEND. Other flags are generally ignored other
than (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and set errno
equal ENOTSUP.
@param bufferSize Size of buffer for read/write - pass 0 if you want
to use the default configured values.
@param replication Block replication - pass 0 if you want to use
the default configured values.
@param blocksize Size of block - pass 0 if you want to use the
default configured values.
@return Returns the handle to the open file or NULL on error.
"""
flags = None
if mode == 'r':
flags = os.O_RDONLY
elif mode == 'w':
flags = os.O_WRONLY
else:
raise HdfsError('Invalid open flags.')
self.fh = libhdfs.hdfsOpenFile(self.fs, filename, flags, buffer_size,
replication, block_size)
if not self.fh:
raise HdfsError('Failed opening %s' % filename)
def pread(self, position, length):
"""Positional read of data from an open file.
@param position Position from which to read
@param length The length of the buffer.
@return Returns the number of bytes actually read, possibly less than
than length; None on error.
"""
st = self.stat()
if position >= st.mSize:
return None
buf = create_string_buffer(length)
buf_p = cast(buf, c_void_p)
ret = libhdfs.hdfsPread(self.fs, self.fh, position, buf_p, length)
if ret == -1:
raise HdfsError('read failure')
return buf.raw
def read(self):
st = self.stat()
buf = create_string_buffer(st.mSize)
buf_p = cast(buf, c_void_p)
ret = libhdfs.hdfsRead(self.fs, self.fh, buf_p, st.mSize)
if ret == -1:
raise HdfsError('read failure')
return buf.raw[0:ret]
def readline(self, length=100):
line = ''
while True:
data = self.pread(self.readline_pos, length)
if data is None:
return line
newline_pos = data.find('\n')
if newline_pos == -1:
self.readline_pos += len(data)
line += data
else:
self.readline_pos += newline_pos+1
return line + data[0:newline_pos+1]
def readlines(self):
return [line for line in self]
def seek(self, position):
"""Seek to given offset in file. This works only for
files opened in read-only mode.
Returns True if seek was successful, False on error.
"""
if libhdfs.hdfsSeek(self.fs, self.fh, position) == 0:
return True
else:
return False
def stat(self):
return libhdfs.hdfsGetPathInfo(self.fs, self.filename).contents
def tell(self):
"""Returns current offset in bytes, None on error."""
ret = libhdfs.hdfsTell(self.fs, self.fh)
if ret != -1:
return ret
else:
return None
def write(self, buffer):
sb = create_string_buffer(buffer)
buffer_p = cast(sb, c_void_p)
ret = libhdfs.hdfsWrite(self.fs, self.fh, buffer_p, len(buffer))
if ret != -1:
return True
else:
return False