Skip to content

Commit

Permalink
[FLINK-19232][python] Support iterating MapState and MapView
Browse files Browse the repository at this point in the history
This closes apache#13739.
  • Loading branch information
WeiZhong94 authored and dianfu committed Oct 27, 2020
1 parent 2e60d04 commit d30e523
Show file tree
Hide file tree
Showing 8 changed files with 667 additions and 49 deletions.
12 changes: 9 additions & 3 deletions docs/_includes/generated/python_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,19 @@
<td>If set, the Python worker will configure itself to use the managed memory budget of the task slot. Otherwise, it will use the Off-Heap Memory of the task slot. In this case, users should set the Task Off-Heap Memory using the configuration key taskmanager.memory.task.off-heap.size.</td>
</tr>
<tr>
<td><h5>python.map-state.read.cache.size</h5></td>
<td><h5>python.map-state.iterate-response-batch-size</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>The maximum number of the MapState keys/entries sent to Python UDF worker in each batch when iterating a Python MapState. Note that this is an experimental flag and might not be available in future releases.</td>
</tr>
<tr>
<td><h5>python.map-state.read-cache-size</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>The maximum number of cached entries for a single Python MapState. Note that this is an experimental flag and might not be available in future releases.</td>
</tr>
<tr>
<td><h5>python.map-state.write.cache.size</h5></td>
<td><h5>python.map-state.write-cache-size</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>The maximum number of cached write requests for a single Python MapState. The write requests will be flushed to the state backend (managed in the Java operator) when the number of cached write requests exceed this limit. Note that this is an experimental flag and might not be available in future releases.</td>
Expand All @@ -81,7 +87,7 @@
<td>Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists. The option is equivalent to the command line option "-pyreq".</td>
</tr>
<tr>
<td><h5>python.state.cache.size</h5></td>
<td><h5>python.state.cache-size</h5></td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>The maximum number of states cached in a Python UDF worker. Note that this is an experimental flag and might not be available in future releases.</td>
Expand Down
7 changes: 4 additions & 3 deletions flink-python/pyflink/fn_execution/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ def contains(self, key) -> bool:
return self._map_state.contains(key)

def items(self):
raise NotImplementedError
return self._map_state.items()

def keys(self):
raise NotImplementedError
return self._map_state.keys()

def values(self):
raise NotImplementedError
return self._map_state.values()

def is_empty(self) -> bool:
return self._map_state.is_empty()
Expand Down Expand Up @@ -422,6 +422,7 @@ def close(self):
def process_element(self, input_data: Row):
key = self.key_selector.get_key(input_data)
self.state_backend.set_current_key(key)
self.state_backend.clear_cached_iterators()
accumulator_state = self.state_backend.get_value_state(
"accumulators", self.state_value_coder)
accumulators = accumulator_state.value()
Expand Down
Loading

0 comments on commit d30e523

Please sign in to comment.