diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java index 4bdd38248673c..ef394f870c3f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java @@ -55,7 +55,7 @@ int size() { return aMap.size(); } - V getKeyA(A aKey) { + V getValueByKeyA(A aKey) { final Tuple2 value = aMap.get(aKey); if (value != null) { @@ -65,7 +65,7 @@ V getKeyA(A aKey) { } } - V getKeyB(B bKey) { + V getValueByKeyB(B bKey) { final A aKey = bMap.get(bKey); if (aKey != null) { @@ -75,6 +75,20 @@ V getKeyB(B bKey) { } } + A getKeyAByKeyB(B bKey) { + return bMap.get(bKey); + } + + B getKeyBByKeyA(A aKey) { + final Tuple2 value = aMap.get(aKey); + + if (value != null) { + return value.f0; + } else { + return null; + } + } + V put(A aKey, B bKey, V value) { final V removedValue = removeKeyA(aKey); removeKeyB(bKey); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java index 0f0ba0d8c53eb..689391fc90724 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java @@ -356,7 +356,7 @@ private void requestSlotFromResourceManager( } private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) { - final PendingRequest request = pendingRequests.getKeyA(slotRequestID); + final PendingRequest request = pendingRequests.getValueByKeyA(slotRequestID); if (request != null) { if (isBatchRequestAndFailureCanBeIgnored(request, failure)) { log.debug("Ignoring failed request to the resource manager for a batch slot request."); @@ -694,7 +694,7 @@ public Optional failAllocation(final AllocationID allocationID, fina componentMainThreadExecutor.assertRunningInMainThread(); - final PendingRequest pendingRequest = pendingRequests.getKeyB(allocationID); + final PendingRequest pendingRequest = pendingRequests.getValueByKeyB(allocationID); if (pendingRequest != null) { if (isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) { log.debug("Ignoring allocation failure for batch slot request {}.", pendingRequest.getSlotRequestId()); @@ -1050,11 +1050,11 @@ void add(SlotRequestId slotRequestId, AllocatedSlot allocatedSlot) { * @return The allocated slot, null if we can't find a match */ AllocatedSlot get(final AllocationID allocationID) { - return allocatedSlotsById.getKeyA(allocationID); + return allocatedSlotsById.getValueByKeyA(allocationID); } AllocatedSlot get(final SlotRequestId slotRequestId) { - return allocatedSlotsById.getKeyB(slotRequestId); + return allocatedSlotsById.getValueByKeyB(slotRequestId); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java index a7e3cf2208abf..c84fdea95adb7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java @@ -68,9 +68,9 @@ public void ensuresOneToOneMappingBetweenKeysSamePrimaryKey() { map.put(1, 1, "foobar"); map.put(1, 2, secondValue); - assertThat(map.getKeyB(1), nullValue()); - assertThat(map.getKeyA(1), is(secondValue)); - assertThat(map.getKeyB(2), is(secondValue)); + assertThat(map.getValueByKeyB(1), nullValue()); + assertThat(map.getValueByKeyA(1), is(secondValue)); + assertThat(map.getValueByKeyB(2), is(secondValue)); } @Test @@ -81,8 +81,8 @@ public void ensuresOneToOneMappingBetweenKeysSameSecondaryKey() { map.put(1, 1, "foobar"); map.put(2, 1, secondValue); - assertThat(map.getKeyA(1), nullValue()); - assertThat(map.getKeyB(1), is(secondValue)); - assertThat(map.getKeyA(2), is(secondValue)); + assertThat(map.getValueByKeyA(1), nullValue()); + assertThat(map.getValueByKeyB(1), is(secondValue)); + assertThat(map.getValueByKeyA(2), is(secondValue)); } }