forked from spotify/luigi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
webhdfs_minicluster.py
77 lines (67 loc) · 2.56 KB
/
webhdfs_minicluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# -*- coding: utf-8 -*-
#
# Copyright 2015 VNG Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from minicluster import MiniClusterTestCase
import unittest
import subprocess
import select
import re
try:
from snakebite.minicluster import MiniCluster
except ImportError:
raise unittest.SkipTest('To use minicluster, snakebite must be installed.')
class WebHdfsMiniCluster(MiniCluster):
'''
This is a unclean class overriding of the snakebite minicluster.
But since it seemed pretty inflexible I had to override private methods
here.
'''
@property
def webhdfs_port(self):
return self.port
def _start_mini_cluster(self, nnport=None):
"""
Copied in an ugly manner from snakebite source code.
"""
if self._jobclient_jar:
hadoop_jar = self._jobclient_jar
else:
hadoop_jar = self._find_mini_cluster_jar(self._hadoop_home)
if not hadoop_jar:
raise Exception("No hadoop jobclient test jar found")
cmd = [self._hadoop_cmd, 'jar', hadoop_jar,
'minicluster', '-nomr', '-format']
if nnport:
cmd.extend(['-nnport', "%s" % nnport])
if True:
# luigi webhdfs version
cmd.extend(['-Ddfs.webhdfs.enabled=true'])
self.hdfs = subprocess.Popen(cmd, bufsize=0, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
def _get_namenode_port(self):
while self.hdfs.poll() is None:
rlist, wlist, xlist = select.select([self.hdfs.stderr, self.hdfs.stdout], [], [])
for f in rlist:
line = f.readline()
print(line,)
# luigi webhdfs version (different regex)
m = re.match(".*namenode.NameNode: Web-server up at: localhost:(\d+).*", line)
if m:
return int(m.group(1))
class WebHdfsMiniClusterTestCase(MiniClusterTestCase):
@classmethod
def instantiate_cluster(cls):
return WebHdfsMiniCluster(None, nnport=50030)