Skip to content

Commit

Permalink
Changed MultiplexProducer to use any Closeable class as a generic
Browse files Browse the repository at this point in the history
  • Loading branch information
lukkm authored and tyronen committed Jun 25, 2015
1 parent eeaf0aa commit 70979d9
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import android.util.Pair;

import com.facebook.cache.common.CacheKey;
import com.facebook.common.references.CloseableReference;
import com.facebook.imagepipeline.cache.CacheKeyFactory;
import com.facebook.imagepipeline.image.CloseableImage;
import com.facebook.imagepipeline.request.ImageRequest;
Expand All @@ -20,7 +21,8 @@
* Multiplex producer that uses the bitmap memory cache key to combine requests.
*/
public class BitmapMemoryCacheKeyMultiplexProducer extends
MultiplexProducer<Pair<CacheKey, ImageRequest.RequestLevel>, CloseableImage> {
MultiplexProducer<Pair<CacheKey, ImageRequest.RequestLevel>,
CloseableReference<CloseableImage>> {

private final CacheKeyFactory mCacheKeyFactory;

Expand All @@ -38,4 +40,9 @@ protected Pair<CacheKey, ImageRequest.RequestLevel> getKey(
producerContext.getLowestPermittedRequestLevel());
}

public CloseableReference<CloseableImage> cloneOrNull(
CloseableReference<CloseableImage> closeableImage) {
return CloseableReference.cloneOrNull(closeableImage);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import android.util.Pair;

import com.facebook.common.references.CloseableReference;
import com.facebook.imagepipeline.cache.CacheKeyFactory;
import com.facebook.imagepipeline.memory.PooledByteBuffer;
import com.facebook.cache.common.CacheKey;
Expand All @@ -20,7 +21,8 @@
* Multiplex producer that uses the encoded cache key to combine requests.
*/
public class EncodedCacheKeyMultiplexProducer extends
MultiplexProducer<Pair<CacheKey, ImageRequest.RequestLevel>, PooledByteBuffer> {
MultiplexProducer<Pair<CacheKey, ImageRequest.RequestLevel>,
CloseableReference<PooledByteBuffer>> {

private final CacheKeyFactory mCacheKeyFactory;

Expand All @@ -34,4 +36,9 @@ protected Pair<CacheKey, ImageRequest.RequestLevel> getKey(ProducerContext produ
mCacheKeyFactory.getEncodedCacheKey(producerContext.getImageRequest()),
producerContext.getLowestPermittedRequestLevel());
}

public CloseableReference<PooledByteBuffer> cloneOrNull(
CloseableReference<PooledByteBuffer> ref) {
return CloseableReference.cloneOrNull(ref);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -24,7 +26,6 @@
import com.facebook.common.internal.Preconditions;
import com.facebook.common.internal.Sets;
import com.facebook.common.internal.VisibleForTesting;
import com.facebook.common.references.CloseableReference;
import com.facebook.imagepipeline.common.Priority;

/**
Expand All @@ -39,7 +40,7 @@
* @param <T> type of the closeable reference result that is returned to this producer
*/
@ThreadSafe
public abstract class MultiplexProducer<K, T> implements Producer<CloseableReference<T>> {
public abstract class MultiplexProducer<K, T extends Closeable> implements Producer<T> {

/**
* Map of multiplexers guarded by "this" lock. The lock should be used only to synchronize
Expand All @@ -52,15 +53,15 @@ public abstract class MultiplexProducer<K, T> implements Producer<CloseableRefer
*/
@GuardedBy("this")
@VisibleForTesting final Map<K, Multiplexer> mMultiplexers;
private final Producer<CloseableReference<T>> mNextProducer;
private final Producer<T> mNextProducer;

protected MultiplexProducer(Producer<CloseableReference<T>> nextProducer) {
protected MultiplexProducer(Producer<T> nextProducer) {
mNextProducer = nextProducer;
mMultiplexers = new HashMap<>();
}

@Override
public void produceResults(Consumer<CloseableReference<T>> consumer, ProducerContext context) {
public void produceResults(Consumer<T> consumer, ProducerContext context) {
K key = getKey(context);
Multiplexer multiplexer;
boolean createdNewMultiplexer;
Expand Down Expand Up @@ -105,6 +106,8 @@ private synchronized void removeMultiplexer(K key, Multiplexer multiplexer) {

protected abstract K getKey(ProducerContext producerContext);

protected abstract T cloneOrNull(T object);

/**
* Multiplexes same requests - passes the same result to multiple consumers, manages cancellation
* and maintains last intermediate result.
Expand Down Expand Up @@ -137,12 +140,11 @@ private synchronized void removeMultiplexer(K key, Multiplexer multiplexer) {
* <li> cancellation notification is received and mConsumerContextPairs is empty </li>
* </ul>
*/
private final CopyOnWriteArraySet<Pair<Consumer<CloseableReference<T>>, ProducerContext>>
mConsumerContextPairs;
private final CopyOnWriteArraySet<Pair<Consumer<T>, ProducerContext>> mConsumerContextPairs;

@GuardedBy("Multiplexer.this")
@Nullable
private CloseableReference<T> mLastIntermediateResult;
private T mLastIntermediateResult;
@GuardedBy("Multiplexer.this")
private float mLastProgress;

Expand Down Expand Up @@ -185,11 +187,11 @@ public Multiplexer(K key) {
* @return true if consumer was added successfully
*/
public boolean addNewConsumer(
final Consumer<CloseableReference<T>> consumer,
final Consumer<T> consumer,
final ProducerContext producerContext) {
final Pair<Consumer<CloseableReference<T>>, ProducerContext> consumerContextPair =
final Pair<Consumer<T>, ProducerContext> consumerContextPair =
Pair.create(consumer, producerContext);
CloseableReference<T> lastIntermediateResult;
T lastIntermediateResult;
final List<ProducerContextCallbacks> prefetchCallbacks;
final List<ProducerContextCallbacks> priorityCallbacks;
final List<ProducerContextCallbacks> intermediateResultsCallbacks;
Expand Down Expand Up @@ -220,7 +222,7 @@ public boolean addNewConsumer(
if (lastIntermediateResult != mLastIntermediateResult) {
lastIntermediateResult = null;
} else if (lastIntermediateResult != null) {
lastIntermediateResult = lastIntermediateResult.clone();
lastIntermediateResult = cloneOrNull(lastIntermediateResult);
}
}

Expand All @@ -229,7 +231,7 @@ public boolean addNewConsumer(
consumer.onProgressUpdate(lastProgress);
}
consumer.onNewResult(lastIntermediateResult, false);
lastIntermediateResult.close();
closeSafely(lastIntermediateResult);
}
}

Expand All @@ -242,7 +244,7 @@ public boolean addNewConsumer(
* prefetch status of the consumer changes.
*/
private void addCallbacks(
final Pair<Consumer<CloseableReference<T>>, ProducerContext> consumerContextPair,
final Pair<Consumer<T>, ProducerContext> consumerContextPair,
final ProducerContext producerContext) {
producerContext.addCallbacks(
new BaseProducerContextCallbacks() {
Expand Down Expand Up @@ -345,7 +347,7 @@ private synchronized List<ProducerContextCallbacks> updateIsPrefetch() {
}

private synchronized boolean computeIsPrefetch() {
for (Pair<Consumer<CloseableReference<T>>, ProducerContext> pair : mConsumerContextPairs) {
for (Pair<Consumer<T>, ProducerContext> pair : mConsumerContextPairs) {
if (!pair.second.isPrefetch()) {
return false;
}
Expand All @@ -363,7 +365,7 @@ private synchronized List<ProducerContextCallbacks> updateIsIntermediateResultEx
}

private synchronized boolean computeIsIntermediateResultExpected() {
for (Pair<Consumer<CloseableReference<T>>, ProducerContext> pair : mConsumerContextPairs) {
for (Pair<Consumer<T>, ProducerContext> pair : mConsumerContextPairs) {
if (pair.second.isIntermediateResultExpected()) {
return true;
}
Expand All @@ -381,14 +383,14 @@ private synchronized List<ProducerContextCallbacks> updatePriority() {

private synchronized Priority computePriority() {
Priority priority = Priority.LOW;
for (Pair<Consumer<CloseableReference<T>>, ProducerContext> pair : mConsumerContextPairs) {
for (Pair<Consumer<T>, ProducerContext> pair : mConsumerContextPairs) {
priority = Priority.getHigherPriority(priority, pair.second.getPriority());
}
return priority;
}

public void onFailure(final ForwardingConsumer consumer, final Throwable t) {
Iterator<Pair<Consumer<CloseableReference<T>>, ProducerContext>> iterator;
Iterator<Pair<Consumer<T>, ProducerContext>> iterator;
synchronized (Multiplexer.this) {
// check for late callbacks
if (mForwardingConsumer != consumer) {
Expand All @@ -399,12 +401,12 @@ public void onFailure(final ForwardingConsumer consumer, final Throwable t) {

mConsumerContextPairs.clear();
removeMultiplexer(mKey, this);
CloseableReference.closeSafely(mLastIntermediateResult);
closeSafely(mLastIntermediateResult);
mLastIntermediateResult = null;
}

while (iterator.hasNext()) {
Pair<Consumer<CloseableReference<T>>, ProducerContext> pair = iterator.next();
Pair<Consumer<T>, ProducerContext> pair = iterator.next();
synchronized (pair) {
pair.first.onFailure(t);
}
Expand All @@ -413,31 +415,31 @@ public void onFailure(final ForwardingConsumer consumer, final Throwable t) {

public void onNextResult(
final ForwardingConsumer consumer,
final CloseableReference<T> closeableReference,
final T closeableObject,
final boolean isFinal) {
Iterator<Pair<Consumer<CloseableReference<T>>, ProducerContext>> iterator;
Iterator<Pair<Consumer<T>, ProducerContext>> iterator;
synchronized (Multiplexer.this) {
// check for late callbacks
if (mForwardingConsumer != consumer) {
return;
}

CloseableReference.closeSafely(mLastIntermediateResult);
closeSafely(mLastIntermediateResult);
mLastIntermediateResult = null;

iterator = mConsumerContextPairs.iterator();
if (!isFinal) {
mLastIntermediateResult = closeableReference.clone();
mLastIntermediateResult = cloneOrNull(closeableObject);
} else {
mConsumerContextPairs.clear();
removeMultiplexer(mKey, this);
}
}

while (iterator.hasNext()) {
Pair<Consumer<CloseableReference<T>>, ProducerContext> pair = iterator.next();
Pair<Consumer<T>, ProducerContext> pair = iterator.next();
synchronized (pair) {
pair.first.onNewResult(closeableReference, isFinal);
pair.first.onNewResult(closeableObject, isFinal);
}
}
}
Expand All @@ -451,15 +453,15 @@ public void onCancelled(final ForwardingConsumer forwardingConsumer) {

mForwardingConsumer = null;
mMultiplexProducerContext = null;
CloseableReference.closeSafely(mLastIntermediateResult);
closeSafely(mLastIntermediateResult);
mLastIntermediateResult = null;
}

startNextProducerIfHasAttachedConsumers();
}

public void onProgressUpdate(ForwardingConsumer forwardingConsumer, float progress) {
Iterator<Pair<Consumer<CloseableReference<T>>, ProducerContext>> iterator;
Iterator<Pair<Consumer<T>, ProducerContext>> iterator;
synchronized (Multiplexer.this) {
// check for late callbacks
if (mForwardingConsumer != forwardingConsumer) {
Expand All @@ -471,19 +473,29 @@ public void onProgressUpdate(ForwardingConsumer forwardingConsumer, float progre
}

while (iterator.hasNext()) {
Pair<Consumer<CloseableReference<T>>, ProducerContext> pair = iterator.next();
Pair<Consumer<T>, ProducerContext> pair = iterator.next();
synchronized (pair) {
pair.first.onProgressUpdate(progress);
}
}
}

private void closeSafely(Closeable obj) {
try {
if (obj != null) {
obj.close();
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

/**
* Forwards {@link Consumer} methods to Multiplexer.
*/
private class ForwardingConsumer extends BaseConsumer<CloseableReference<T>> {
private class ForwardingConsumer extends BaseConsumer<T> {
@Override
protected void onNewResultImpl(CloseableReference<T> newResult, boolean isLast) {
protected void onNewResultImpl(T newResult, boolean isLast) {
Multiplexer.this.onNextResult(this, newResult, isLast);
}

Expand Down

0 comments on commit 70979d9

Please sign in to comment.