Skip to content

Commit

Permalink
[FLINK-13236][table-runtime-blink] Fix bug and improve performance in…
Browse files Browse the repository at this point in the history
… TopNBuffer (apache#9098)
  • Loading branch information
tsreaper authored and wuchong committed Jul 15, 2019
1 parent a581268 commit 6d7c1bd
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public int put(BaseRow sortKey, BaseRow value) {
* @param values record lists to be associated with the specified key
*/
void putAll(BaseRow sortKey, Collection<BaseRow> values) {
Collection<BaseRow> oldValues = treeMap.get(sortKey);
if (oldValues != null) {
currentTopNum -= oldValues.size();
}
treeMap.put(sortKey, values);
currentTopNum += values.size();
}
Expand Down Expand Up @@ -145,18 +149,18 @@ BaseRow removeLast() {
*/
BaseRow getElement(int rank) {
int curRank = 0;
Iterator<Map.Entry<BaseRow, Collection<BaseRow>>> iter = treeMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<BaseRow, Collection<BaseRow>> entry = iter.next();
for (Map.Entry<BaseRow, Collection<BaseRow>> entry : treeMap.entrySet()) {
Collection<BaseRow> list = entry.getValue();

Iterator<BaseRow> listIter = list.iterator();
while (listIter.hasNext()) {
BaseRow elem = listIter.next();
curRank += 1;
if (curRank == rank) {
return elem;
if (curRank + list.size() >= rank) {
for (BaseRow elem : list) {
curRank += 1;
if (curRank == rank) {
return elem;
}
}
} else {
curRank += list.size();
}
}
return null;
Expand Down

0 comments on commit 6d7c1bd

Please sign in to comment.