Skip to content

Commit

Permalink
python: replace bespoke minicluster implementation with control shell
Browse files Browse the repository at this point in the history
In this case we're using JSON serialization to talk to the shell because the
Python bindings don't already make use of protobuf.

I had to update test_scantoken.py to shut down its multiprocessing pool
after using it, otherwise its subprocesses inherited a copy of the control
shell stdin pipe writer. This led to hangs on stop_cluster() because closing
the control shell's stdin didn't actually close the last stdin writer, thus
the control shell didn't exit.

Change-Id: I821e864cfe738a4d39ae039b95ca38f16cdcfb82
Reviewed-on: http://gerrit.cloudera.org:8080/8236
Tested-by: Kudu Jenkins
Reviewed-by: Wes McKinney <[email protected]>
Reviewed-by: Dan Burkert <[email protected]>
  • Loading branch information
adembo committed Oct 9, 2017
1 parent 0c82398 commit 1d92895
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 99 deletions.
140 changes: 42 additions & 98 deletions python/kudu/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@
from __future__ import division

import json
import fnmatch
import os
import shutil
import subprocess
import tempfile
import time
import socket

import kudu
from kudu.client import Partitioning
Expand All @@ -42,14 +37,22 @@ class KuduTestBase(object):
tablet servers.
"""

BASE_PORT = 37000
NUM_TABLET_SERVERS = 3
TSERVER_START_TIMEOUT_SECS = 10
NUM_MASTER_SERVERS = 3
NUM_TABLET_SERVERS = 3

@classmethod
def send_and_receive(cls, proc, request):
binary_req = (json.dumps(request) + "\n").encode("utf-8")
proc.stdin.write(binary_req)
proc.stdin.flush()
binary_resp = proc.stdout.readline()
response = json.loads(binary_resp[:-1].decode("utf-8"))
if "error" in response:
raise Exception("Error in response: {0}".format(response["error"]))
return response

@classmethod
def start_cluster(cls):
local_path = tempfile.mkdtemp(dir=os.getenv("TEST_TMPDIR"))
kudu_build = os.getenv("KUDU_BUILD")
if not kudu_build:
kudu_build = os.path.join(os.getenv("KUDU_HOME"), "build", "latest")
Expand All @@ -58,101 +61,42 @@ def start_cluster(cls):
master_hosts = []
master_ports = []

# We need to get the port numbers for the masters before starting them
# so that we can appropriately configure a multi-master.
for m in range(cls.NUM_MASTER_SERVERS):
master_hosts.append('127.0.0.1')
# This introduces a race
s = socket.socket()
s.bind(('', 0))
master_ports.append(s.getsockname()[1])
s.close()

multi_master_string = ','.join('{0}:{1}'.format(host, port)
for host, port
in zip(master_hosts, master_ports))

for m in range(cls.NUM_MASTER_SERVERS):
os.makedirs("{0}/master/{1}".format(local_path, m))
os.makedirs("{0}/master/{1}/data".format(local_path, m))
os.makedirs("{0}/master/{1}/logs".format(local_path, m))


path = [
"{0}/kudu-master".format(bin_path),
"-unlock_unsafe_flags",
"-unlock_experimental_flags",
"-rpc_server_allow_ephemeral_ports",
"-rpc_bind_addresses=0.0.0.0:{0}".format(master_ports[m]),
"-fs_wal_dir={0}/master/{1}/data".format(local_path, m),
"-fs_data_dirs={0}/master/{1}/data".format(local_path, m),
"-log_dir={0}/master/{1}/logs".format(local_path, m),
"-logtostderr",
"-webserver_port=0",
"-master_addresses={0}".format(multi_master_string),
# Only make one replica so that our tests don't need to worry about
# setting consistency modes.
"-default_num_replicas=1"
]

p = subprocess.Popen(path, shell=False)
fid = open("{0}/master/{1}/kudu-master.pid".format(local_path, m), "w+")
fid.write("{0}".format(p.pid))
fid.close()

for m in range(cls.NUM_TABLET_SERVERS):
os.makedirs("{0}/ts/{1}".format(local_path, m))
os.makedirs("{0}/ts/{1}/logs".format(local_path, m))

path = [
"{0}/kudu-tserver".format(bin_path),
"-unlock_unsafe_flags",
"-unlock_experimental_flags",
"-rpc_server_allow_ephemeral_ports",
"-rpc_bind_addresses=0.0.0.0:0",
"-tserver_master_addrs={0}".format(multi_master_string),
"-webserver_port=0",
"-log_dir={0}/ts/{1}/logs".format(local_path, m),
"-logtostderr",
"-fs_data_dirs={0}/ts/{1}/data".format(local_path, m),
"-fs_wal_dir={0}/ts/{1}/data".format(local_path, m),
]
p = subprocess.Popen(path, shell=False)
tserver_pid = "{0}/ts/{1}/kudu-tserver.pid".format(local_path, m)
fid = open(tserver_pid, "w+")
fid.write("{0}".format(p.pid))
fid.close()

return local_path, master_hosts, master_ports
# Start the mini-cluster control process.
args = ["{0}/kudu".format(bin_path), "test", "mini_cluster"]
p = subprocess.Popen(args, shell=False,
stdin=subprocess.PIPE, stdout=subprocess.PIPE)

# Create and start a cluster.
#
# Only make one replica so that our tests don't need to worry about
# setting consistency modes.
cls.send_and_receive(
p, { "create_cluster" :
{ "numMasters" : cls.NUM_MASTER_SERVERS,
"numTservers" : cls.NUM_TABLET_SERVERS,
"extraMasterFlags" : [ "--default_num_replicas=1" ]}})
cls.send_and_receive(p, { "start_cluster" : {}})

# Get information about the cluster's masters.
masters = cls.send_and_receive(p, { "get_masters" : {}})
for m in masters["getMasters"]["masters"]:
master_hosts.append(m["boundRpcAddress"]["host"])
master_ports.append(m["boundRpcAddress"]["port"])

return p, master_hosts, master_ports

@classmethod
def stop_cluster(cls, path):
for root, dirnames, filenames in os.walk('{0}/..'.format(path)):
for filename in fnmatch.filter(filenames, '*.pid'):
with open(os.path.join(root, filename)) as fid:
a = fid.read()
r = subprocess.Popen(["kill", "{0}".format(a)])
r.wait()
os.remove(os.path.join(root, filename))
shutil.rmtree(path, True)
def stop_cluster(cls):
cls.cluster_proc.stdin.close()
ret = cls.cluster_proc.wait()
if ret != 0:
raise Exception("Minicluster process exited with code {0}".format(ret))

@classmethod
def setUpClass(cls):
cls.cluster_path, cls.master_hosts, cls.master_ports = cls.start_cluster()
time.sleep(1)

cls.cluster_proc, cls.master_hosts, cls.master_ports = cls.start_cluster()
cls.client = kudu.connect(cls.master_hosts, cls.master_ports)

# Wait for all tablet servers to start with the configured timeout
timeout = time.time() + cls.TSERVER_START_TIMEOUT_SECS
while len(cls.client.list_tablet_servers()) < cls.NUM_TABLET_SERVERS:
if time.time() > timeout:
raise TimeoutError(
"Tablet servers took too long to start. Timeout set to {}"
.format(cls.TSERVER_START_TIMEOUT_SECS))
# Sleep 50 milliseconds to avoid tight-looping rpc
time.sleep(0.05)

cls.schema = cls.example_schema()
cls.partitioning = cls.example_partitioning()

Expand All @@ -163,7 +107,7 @@ def setUpClass(cls):

@classmethod
def tearDownClass(cls):
cls.stop_cluster(cls.cluster_path)
cls.stop_cluster()

@classmethod
def example_schema(cls):
Expand Down
4 changes: 3 additions & 1 deletion python/kudu/tests/test_scantoken.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples, count_on
# Begin process pool
pool = Pool(len(input))
results = pool.map(_get_scan_token_results, input)
pool.close()
pool.join()

#Validate results
# Validate results
actual_tuples = []
for result in results:
actual_tuples += result
Expand Down

0 comments on commit 1d92895

Please sign in to comment.