Skip to content

Commit

Permalink
[python] - Expose additional scanner methods
Browse files Browse the repository at this point in the history
This patch exposes a few remaining scanner methods from the C++
client.
	- set_cache_blocks
	- keep_alive
	- close
	- get_current_server

Additionally, this patch fixes an issue with inappropriate deallocation
of replicas (this is handled by deallocating the ScanToken). This patch
includes tests.

Change-Id: Ifa6070a96a5daca796d463ffc3ffcbe5f0a5e08a
Reviewed-on: http://gerrit.cloudera.org:8080/4888
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <[email protected]>
  • Loading branch information
jtbirdsell authored and jdcryans committed Nov 7, 2016
1 parent c25aea4 commit 13ffec6
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 19 deletions.
90 changes: 85 additions & 5 deletions python/kudu/client.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ cdef class Client:
result = []
for i in range(tservers.size()):
ts = TabletServer()
ts._own = 1
result.append(ts._init(tservers[i]))
return result

Expand Down Expand Up @@ -592,15 +593,27 @@ cdef class TabletServer:

cdef:
const KuduTabletServer* _tserver
public bint _own

cdef _init(self, const KuduTabletServer* tserver):
self._tserver = tserver
self._own = 0
return self

def __dealloc__(self):
if self._tserver != NULL:
if self._tserver != NULL and self._own:
del self._tserver

def __richcmp__(TabletServer self, TabletServer other, int op):
if op == 2: # ==
return ((self.uuid(), self.hostname(), self.port()) ==
(other.uuid(), other.hostname(), other.port()))
elif op == 3: # !=
return ((self.uuid(), self.hostname(), self.port()) !=
(other.uuid(), other.hostname(), other.port()))
else:
raise NotImplementedError

def uuid(self):
return frombytes(self._tserver.uuid())

Expand Down Expand Up @@ -1649,6 +1662,77 @@ cdef class Scanner:
check_status(self.scanner.NextBatch(&batch.batch))
return batch

def set_cache_blocks(self, cache_blocks):
"""
Sets the block caching policy.
Returns a reference to itself to facilitate chaining.
Parameters
----------
cache_blocks : bool
Returns
-------
self : Scanner
"""
check_status(self.scanner.SetCacheBlocks(cache_blocks))
return self

def keep_alive(self):
"""
Keep the current remote scanner alive.
Keep the current remote scanner alive on the Tablet server for an
additional time-to-live (set by a configuration flag on the tablet
server). This is useful if the interval in between NextBatch() calls is
big enough that the remote scanner might be garbage collected (default
ttl is set to 60 secs.). This does not invalidate any previously
fetched results.
Returns
-------
self : Scanner
"""
check_status(self.scanner.KeepAlive())
return self

def get_current_server(self):
"""
Get the TabletServer that is currently handling the scan.
More concretely, this is the server that handled the most recent open()
or next_batch() RPC made by the server.
Returns
-------
tserver : TabletServer
"""
cdef:
TabletServer tserver = TabletServer()
KuduTabletServer* tserver_p = NULL

check_status(self.scanner.GetCurrentServer(&tserver_p))
tserver._own = 1
tserver._init(tserver_p)
return tserver

def close(self):
"""
Close the scanner.
Closing the scanner releases resources on the server. This call does
not block, and will not ever fail, even if the server cannot be
contacted.
Note: The scanner is reset to its initial state by this function.
You'll have to re-add any projection, predicates, etc if you want to
reuse this object.
Note: When the Scanner object is garbage collected, this method is run.
This method call is only needed if you want to explicitly release the
resources on the server.
"""
self.scanner.Close()


cdef class ScanToken:
"""
Expand Down Expand Up @@ -2113,10 +2197,6 @@ cdef class Replica:
self._replica = replica
return self

def __dealloc__(self):
if self._replica != NULL:
del self._replica

def is_leader(self):
return self._replica.is_leader()

Expand Down
5 changes: 3 additions & 2 deletions python/kudu/libkudu_client.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,8 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
c_bool HasMoreRows()
Status NextBatch(KuduScanBatch* batch)
Status SetBatchSizeBytes(uint32_t batch_size)

Status SetSelection(ReplicaSelection selection)

Status SetCacheBlocks(c_bool cache_blocks)
Status SetReadMode(ReadMode read_mode)
Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros)
Status SetTimeoutMillis(int millis)
Expand All @@ -638,6 +637,8 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
Status SetFaultTolerant()
Status AddLowerBound(const KuduPartialRow& key)
Status AddExclusiveUpperBound(const KuduPartialRow& key)
Status KeepAlive()
Status GetCurrentServer(KuduTabletServer** server)

KuduSchema GetProjectionSchema()
const ResourceMetrics& GetResourceMetrics()
Expand Down
7 changes: 4 additions & 3 deletions python/kudu/tests/test_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,15 @@ def test_read_mode(self):
# Avoid tight looping
time.sleep(0.05)

def test_resource_metrics(self):
def test_resource_metrics_and_cache_blocks(self):
"""
Test getting the resource metrics after scanning.
Test getting the resource metrics after scanning and
setting the scanner to not cache blocks.
"""

# Build scanner and read through all batches and retrieve metrics.
scanner = self.table.scanner()
scanner.set_fault_tolerant().open()
scanner.set_fault_tolerant().set_cache_blocks(False).open()
scanner.read_all_tuples()
metrics = scanner.get_resource_metrics()

Expand Down
30 changes: 21 additions & 9 deletions python/kudu/tests/test_scantoken.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import kudu
from multiprocessing import Pool
import datetime
import time

def _get_scan_token_results(input):
client = kudu.connect(input[1], input[2])
scanner = client.deserialize_token_into_scanner(input[0])
scanner.open()
return scanner.read_all_tuples()
tuples = scanner.read_all_tuples()
# Test explicit closing of scanner
scanner.close()
return tuples

class TestScanToken(TestScanBase):

Expand Down Expand Up @@ -115,6 +119,16 @@ def test_scan_token_invalid_predicates(self):
with self.assertRaises(TypeError):
builder.add_predicates([sv >= 1])

def _subtest_open_and_confirm_leader_tserver(self, token):
for replica in token.tablet().replicas():
if replica.is_leader():
leader_tserver = replica.ts()

scanner = token.into_kudu_scanner()
scanner.open()
self.assertEqual(scanner.get_current_server(), leader_tserver)
return scanner

def test_scan_token_batch_by_batch_with_local_scanner(self):
builder = self.table.scan_token_builder()
lower_bound = builder.new_bound()
Expand All @@ -128,8 +142,7 @@ def test_scan_token_batch_by_batch_with_local_scanner(self):

tuples = []
for token in tokens:
scanner = token.into_kudu_scanner()
scanner.open()
scanner = self._subtest_open_and_confirm_leader_tserver(token)

while scanner.has_more_rows():
batch = scanner.next_batch()
Expand All @@ -150,10 +163,10 @@ def test_unixtime_micros(self):

tuples = []
for token in tokens:
scanner = token.into_kudu_scanner()
scanner.open()
scanner = self._subtest_open_and_confirm_leader_tserver(token)

while scanner.has_more_rows():
scanner.keep_alive()
batch = scanner.next_batch()
tuples.extend(batch.as_tuples())

Expand Down Expand Up @@ -192,7 +205,7 @@ def test_read_mode(self):

tuples = []
for token in tokens:
scanner = token.into_kudu_scanner().open()
scanner = self._subtest_open_and_confirm_leader_tserver(token)
tuples.extend(scanner.read_all_tuples())

self.assertEqual(sorted(self.tuples[1:]), sorted(tuples))
Expand All @@ -204,7 +217,7 @@ def test_read_mode(self):

tuples = []
for token in tokens:
scanner = token.into_kudu_scanner().open()
scanner = self._subtest_open_and_confirm_leader_tserver(token)
tuples.extend(scanner.read_all_tuples())

self.assertEqual(sorted(self.tuples), sorted(tuples))
Expand Down Expand Up @@ -260,8 +273,7 @@ def test_scan_selection(self):

tuples = []
for token in tokens:
scanner = token.into_kudu_scanner()
scanner.open()
scanner = self._subtest_open_and_confirm_leader_tserver(token)
tuples.extend(scanner.read_all_tuples())

self.assertEqual(sorted(tuples),
Expand Down

0 comments on commit 13ffec6

Please sign in to comment.