Skip to content

Commit

Permalink
Factor out static method-based CoderProvider logic
Browse files Browse the repository at this point in the history
Previously, a couple lines of CoderRegistry implemented
the logic behind @defaultcoder processing, essentially
inlining the invocation of a CoderProvider expressed as
static methods on a class.

Now:

 - The building of a CoderProvider from a suitably defined
   class is factored out in CoderProviders.fromStaticMethods().
 - AvroCoder is suitably defined.
 - SerializableCoder is suitably defined.
 - The CoderRegistry invokes CoderProviders, along with slight
   refactoring.

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=103308789
  • Loading branch information
kennknowles authored and davorbonaci committed Sep 18, 2015
1 parent 6e96186 commit 1f35976
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@
*/
public class AvroCoder<T> extends StandardCoder<T> {

/**
* Returns an {@code AvroCoder} instance for the provided element type.
* @param <T> the element type
*/
public static <T> AvroCoder<T> of(TypeDescriptor<T> type) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) type.getRawType();
return of(clazz);
}

/**
* Returns an {@code AvroCoder} instance for the provided element class.
* @param <T> the element type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

package com.google.cloud.dataflow.sdk.coders;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import java.lang.reflect.InvocationTargetException;
import java.util.List;

/**
Expand All @@ -32,18 +36,38 @@ public final class CoderProviders {
// Static utility class
private CoderProviders() { }

/**
* Creates a {@link CoderProvider} built from particular static methods of a class that
* implements {@link Coder}, particularly for use in {@link DefaultCoder} annotations.
*
* <p>The class must have the following static method:
*
* <pre>{@code
* public static Coder<T> of(TypeDescriptor<T> type)
* }
* </pre>
*/
public static <T> CoderProvider fromStaticMethods(Class<T> clazz) {
return new CoderProviderFromStaticMethods(clazz);
}


/**
* Returns a {@link CoderProvider} that consults each of the provider {@code coderProviders}
* and returns the first {@link Coder} provided.
*
* <p>Note that while the number of types handled will be the union of those handled by all of
* the provided {@code coderProviders}, the actual {@link Coder} provided by an earlier provider
* may have inferior determinism properties.
* <p>Note that the order in which the providers are listed matters: While the set of types
* handled will be the union of those handled by all of the providers in the list, the actual
* {@link Coder} provided by the first successful provider may differ, and may have inferior
* properties. For example, not all {@link Coder Coders} are deterministic, handle {@code null}
* values, or have comparable performance.
*/
public static CoderProvider firstOf(CoderProvider... coderProviders) {
return new FirstOf(ImmutableList.copyOf(coderProviders));
}

///////////////////////////////////////////////////////////////////////////////////////////////

/**
* @see #firstOf
*/
Expand Down Expand Up @@ -71,4 +95,70 @@ public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderEx
type, Joiner.on("; ").join(messages)));
}
}

private static class CoderProviderFromStaticMethods implements CoderProvider {

/** If true, then clazz has {@code of(TypeDescriptor)}. If false, {@code of(Class)}. */
private final boolean takesTypeDescriptor;
private final Class<?> clazz;

public CoderProviderFromStaticMethods(Class<?> clazz) {
// Note that the second condition supports older classes, which only needed to provide
// of(Class), not of(TypeDescriptor). Our own classes have updated to accept a
// TypeDescriptor. Hence the error message points only to the current specification,
// not both acceptable conditions.
checkArgument(classTakesTypeDescriptor(clazz) || classTakesClass(clazz),
"Class " + clazz.getCanonicalName()
+ " is missing required static method of(TypeDescriptor).");

this.takesTypeDescriptor = classTakesTypeDescriptor(clazz);
this.clazz = clazz;
}

@Override
public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
try {
if (takesTypeDescriptor) {
@SuppressWarnings("unchecked")
Coder<T> result = InstanceBuilder.ofType(Coder.class)
.fromClass(clazz)
.fromFactoryMethod("of")
.withArg(TypeDescriptor.class, type)
.build();
return result;
} else {
@SuppressWarnings("unchecked")
Coder<T> result = InstanceBuilder.ofType(Coder.class)
.fromClass(clazz)
.fromFactoryMethod("of")
.withArg(Class.class, type.getRawType())
.build();
return result;
}
} catch (RuntimeException exc) {
if (exc.getCause() instanceof InvocationTargetException) {
throw new CannotProvideCoderException(exc.getCause().getCause());
}
throw exc;
}
}

private boolean classTakesTypeDescriptor(Class<?> clazz) {
try {
clazz.getDeclaredMethod("of", TypeDescriptor.class);
return true;
} catch (NoSuchMethodException | SecurityException exc) {
return false;
}
}

private boolean classTakesClass(Class<?> clazz) {
try {
clazz.getDeclaredMethod("of", Class.class);
return true;
} catch (NoSuchMethodException | SecurityException exc) {
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException.ReasonCode;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.TimestampedValue;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
Expand Down Expand Up @@ -307,17 +306,10 @@ public <T> Coder<T> getDefaultCoder(Class<T> clazz) throws CannotProvideCoderExc
// try other ways of finding one
}

DefaultCoder defaultAnnotation = clazz.getAnnotation(
DefaultCoder.class);
if (defaultAnnotation != null) {
LOG.debug("Default coder for {} found by DefaultCoder annotation", clazz);
@SuppressWarnings("unchecked")
Coder<T> coder = InstanceBuilder.ofType(Coder.class)
.fromClass(defaultAnnotation.value())
.fromFactoryMethod("of")
.withArg(Class.class, clazz)
.build();
return coder;
try {
return getDefaultCoderFromAnnotation(clazz);
} catch (CannotProvideCoderException exc) {
// try other ways
}

if (getFallbackCoderProvider() != null) {
Expand Down Expand Up @@ -647,6 +639,24 @@ private CoderFactory getDefaultCoderFactory(Class<?> clazz) throws CannotProvide
}
}

/**
* Returns the {@link Coder} returned according to the {@link CoderProvider} from any
* {@link DefaultCoder} annotation on the given class.
*/
private <T> Coder<T> getDefaultCoderFromAnnotation(Class<T> clazz)
throws CannotProvideCoderException {
DefaultCoder defaultAnnotation = clazz.getAnnotation(DefaultCoder.class);
if (defaultAnnotation == null) {
throw new CannotProvideCoderException(
String.format("Class %s does not have a @DefaultCoder annotation.",
clazz.getCanonicalName()));
}

LOG.debug("DefaultCoder annotation found for {}", clazz);
CoderProvider coderProvider = CoderProviders.fromStaticMethods(defaultAnnotation.value());
return coderProvider.getCoder(TypeDescriptor.of(clazz));
}

/**
* Returns the {@link Coder} to use by default for values of the given type,
* in a context where the given types use the given coders.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@
*/
public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {

/**
* Returns a {@code SerializableCoder} instance for the provided element type.
* @param <T> the element type
*/
public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> type) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) type.getRawType();
return of(clazz);
}

/**
* Returns a {@code SerializableCoder} instance for the provided element class.
* @param <T> the element type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package com.google.cloud.dataflow.sdk.coders;

import static com.google.common.base.Preconditions.checkArgument;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;

import com.google.api.client.util.Preconditions;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;

import org.junit.Before;
Expand Down Expand Up @@ -64,15 +64,18 @@ private static class SerializableRecord extends SerializableBase {
private static class CustomRecord extends SerializableBase {
}

@DefaultCoder(OldCustomSerializableCoder.class)
private static class OldCustomRecord extends SerializableBase {
}

private static class Unknown {
}

private static class CustomSerializableCoder extends SerializableCoder<CustomRecord> {
// Extending SerializableCoder isn't trivial, but it can be done.
@SuppressWarnings("unchecked")
public static <T extends Serializable> SerializableCoder<T> of(Class<T> recordType) {
Preconditions.checkArgument(
CustomRecord.class.isAssignableFrom(recordType));
public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> recordType) {
checkArgument(recordType.isSupertypeOf(new TypeDescriptor<CustomRecord>() {}));
return (SerializableCoder<T>) new CustomSerializableCoder();
}

Expand All @@ -81,6 +84,20 @@ protected CustomSerializableCoder() {
}
}

private static class OldCustomSerializableCoder extends SerializableCoder<OldCustomRecord> {
// Extending SerializableCoder isn't trivial, but it can be done.
@Deprecated // old form using a Class
@SuppressWarnings("unchecked")
public static <T extends Serializable> SerializableCoder<T> of(Class<T> recordType) {
checkArgument(OldCustomRecord.class.isAssignableFrom(recordType));
return (SerializableCoder<T>) new OldCustomSerializableCoder();
}

protected OldCustomSerializableCoder() {
super(OldCustomRecord.class);
}
}

@Test
public void testDefaultCoderClasses() throws Exception {
assertThat(registry.getDefaultCoder(AvroRecord.class), instanceOf(AvroCoder.class));
Expand All @@ -90,6 +107,8 @@ public void testDefaultCoderClasses() throws Exception {
instanceOf(SerializableCoder.class));
assertThat(registry.getDefaultCoder(CustomRecord.class),
instanceOf(CustomSerializableCoder.class));
assertThat(registry.getDefaultCoder(OldCustomRecord.class),
instanceOf(OldCustomSerializableCoder.class));
}

@Test
Expand Down

0 comments on commit 1f35976

Please sign in to comment.