Skip to content

Commit

Permalink
[FLINK-17019][runtime] Enable DualKeyLinkedMap for querying primary/s…
Browse files Browse the repository at this point in the history
…econdary key with a given secondary/primary key
  • Loading branch information
zhuzhurk committed Jun 18, 2020
1 parent b8698c0 commit b5e3c92
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ int size() {
return aMap.size();
}

V getKeyA(A aKey) {
V getValueByKeyA(A aKey) {
final Tuple2<B, V> value = aMap.get(aKey);

if (value != null) {
Expand All @@ -65,7 +65,7 @@ V getKeyA(A aKey) {
}
}

V getKeyB(B bKey) {
V getValueByKeyB(B bKey) {
final A aKey = bMap.get(bKey);

if (aKey != null) {
Expand All @@ -75,6 +75,20 @@ V getKeyB(B bKey) {
}
}

A getKeyAByKeyB(B bKey) {
return bMap.get(bKey);
}

B getKeyBByKeyA(A aKey) {
final Tuple2<B, V> value = aMap.get(aKey);

if (value != null) {
return value.f0;
} else {
return null;
}
}

V put(A aKey, B bKey, V value) {
final V removedValue = removeKeyA(aKey);
removeKeyB(bKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ private void requestSlotFromResourceManager(
}

private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) {
final PendingRequest request = pendingRequests.getKeyA(slotRequestID);
final PendingRequest request = pendingRequests.getValueByKeyA(slotRequestID);
if (request != null) {
if (isBatchRequestAndFailureCanBeIgnored(request, failure)) {
log.debug("Ignoring failed request to the resource manager for a batch slot request.");
Expand Down Expand Up @@ -694,7 +694,7 @@ public Optional<ResourceID> failAllocation(final AllocationID allocationID, fina

componentMainThreadExecutor.assertRunningInMainThread();

final PendingRequest pendingRequest = pendingRequests.getKeyB(allocationID);
final PendingRequest pendingRequest = pendingRequests.getValueByKeyB(allocationID);
if (pendingRequest != null) {
if (isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) {
log.debug("Ignoring allocation failure for batch slot request {}.", pendingRequest.getSlotRequestId());
Expand Down Expand Up @@ -1050,11 +1050,11 @@ void add(SlotRequestId slotRequestId, AllocatedSlot allocatedSlot) {
* @return The allocated slot, null if we can't find a match
*/
AllocatedSlot get(final AllocationID allocationID) {
return allocatedSlotsById.getKeyA(allocationID);
return allocatedSlotsById.getValueByKeyA(allocationID);
}

AllocatedSlot get(final SlotRequestId slotRequestId) {
return allocatedSlotsById.getKeyB(slotRequestId);
return allocatedSlotsById.getValueByKeyB(slotRequestId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public void ensuresOneToOneMappingBetweenKeysSamePrimaryKey() {
map.put(1, 1, "foobar");
map.put(1, 2, secondValue);

assertThat(map.getKeyB(1), nullValue());
assertThat(map.getKeyA(1), is(secondValue));
assertThat(map.getKeyB(2), is(secondValue));
assertThat(map.getValueByKeyB(1), nullValue());
assertThat(map.getValueByKeyA(1), is(secondValue));
assertThat(map.getValueByKeyB(2), is(secondValue));
}

@Test
Expand All @@ -81,8 +81,8 @@ public void ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
map.put(1, 1, "foobar");
map.put(2, 1, secondValue);

assertThat(map.getKeyA(1), nullValue());
assertThat(map.getKeyB(1), is(secondValue));
assertThat(map.getKeyA(2), is(secondValue));
assertThat(map.getValueByKeyA(1), nullValue());
assertThat(map.getValueByKeyB(1), is(secondValue));
assertThat(map.getValueByKeyA(2), is(secondValue));
}
}

0 comments on commit b5e3c92

Please sign in to comment.