Skip to content

Commit

Permalink
Replacing the use of Java object serialization with Jackson (elastic#…
Browse files Browse the repository at this point in the history
masseyke authored May 2, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 8007115 commit 5b4a454
Showing 15 changed files with 147 additions and 113 deletions.
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ public static class PartitionDefinitionBuilder {

private PartitionDefinitionBuilder(Settings settings, Mapping resolvedMapping) {
this.serializedSettings = settings == null ? null : settings.save();
this.serializedMapping = resolvedMapping == null ? null : IOUtils.serializeToBase64(resolvedMapping);
this.serializedMapping = resolvedMapping == null ? null : IOUtils.serializeToJsonString(resolvedMapping);
}

public PartitionDefinition build(String index, int shardId) {
Original file line number Diff line number Diff line change
@@ -419,7 +419,7 @@ public static PartitionReader createReader(Settings settings, PartitionDefinitio
RestRepository repository = new RestRepository(settings);
Mapping fieldMapping = null;
if (StringUtils.hasText(partition.getSerializedMapping())) {
fieldMapping = IOUtils.deserializeFromBase64(partition.getSerializedMapping());
fieldMapping = IOUtils.deserializeFromJsonString(partition.getSerializedMapping(), Mapping.class);
}
else {
log.warn(String.format("No mapping found for [%s] - either no index exists or the partition configuration has been corrupted", partition));
Original file line number Diff line number Diff line change
@@ -21,10 +21,12 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;

import org.elasticsearch.hadoop.serialization.FieldType;
import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.annotate.JsonCreator;
import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.annotate.JsonProperty;

@SuppressWarnings("serial")
public class Field implements Serializable {

static final Field[] NO_FIELDS = new Field[0];
@@ -41,20 +43,24 @@ public Field(String name, FieldType type, Collection<Field> properties) {
this(name, type, (properties != null ? properties.toArray(new Field[properties.size()]) : NO_FIELDS));
}

Field(String name, FieldType type, Field[] properties) {
@JsonCreator
Field(@JsonProperty("name") String name, @JsonProperty("type") FieldType type, @JsonProperty("properties") Field[] properties) {
this.name = name;
this.type = type;
this.properties = properties;
}

@JsonProperty("properties")
public Field[] properties() {
return properties;
}

@JsonProperty("type")
public FieldType type() {
return type;
}

@JsonProperty("name")
public String name() {
return name;
}
@@ -63,4 +69,15 @@ public String name() {
public String toString() {
return String.format("%s=%s", name, ((type == FieldType.OBJECT || type == FieldType.NESTED) ? Arrays.toString(properties) : type));
}

@Override
public boolean equals(Object o) {
if (o instanceof Field == false) {
return false;
}
Field other = (Field) o;
return Objects.equals(this.name, other.name) &&
Objects.equals(this.type, other.type) &&
Objects.deepEquals(this.properties, other.properties);
}
}
Original file line number Diff line number Diff line change
@@ -19,6 +19,10 @@

package org.elasticsearch.hadoop.serialization.dto.mapping;

import org.elasticsearch.hadoop.serialization.FieldType;
import org.elasticsearch.hadoop.serialization.field.FieldFilter;
import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.annotate.JsonProperty;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -27,9 +31,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import org.elasticsearch.hadoop.serialization.FieldType;
import org.elasticsearch.hadoop.serialization.field.FieldFilter;
import java.util.Objects;

/**
* A mapping has a name and a collection of fields.
@@ -53,7 +55,7 @@ public Mapping(String index, String name, Collection<Field> fields) {
this(index, name, (fields != null ? fields.toArray(new Field[fields.size()]) : Field.NO_FIELDS));
}

Mapping(String index, String type, Field[] fields) {
Mapping(@JsonProperty("index") String index, @JsonProperty("type") String type, @JsonProperty("fields") Field[] fields) {
this.index = index;
this.type = type;
this.fields = fields;
@@ -154,4 +156,15 @@ public String toString() {
return String.format("%s=%s", index, Arrays.toString(fields));
}
}

@Override
public boolean equals(Object o) {
if (o instanceof Mapping == false) {
return false;
}
Mapping other = (Mapping) o;
return Objects.equals(this.index, other.index) &&
Objects.equals(this.type, other.type) &&
Objects.deepEquals(this.fields, other.fields);
}
}
58 changes: 24 additions & 34 deletions mr/src/main/java/org/elasticsearch/hadoop/util/IOUtils.java
Original file line number Diff line number Diff line change
@@ -18,31 +18,25 @@
*/
package org.elasticsearch.hadoop.util;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.serialization.EsHadoopSerializationException;
import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.map.SerializationConfig;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.lang.reflect.Field;
import java.net.JarURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
import java.util.Properties;

import javax.xml.bind.DatatypeConverter;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
import org.elasticsearch.hadoop.serialization.EsHadoopSerializationException;

/**
* Utility class used internally for the Pig support.
*/
@@ -55,42 +49,38 @@ public abstract class IOUtils {
ReflectionUtils.makeAccessible(BYTE_ARRAY_BUFFER);
}

public static String serializeToBase64(Serializable object) {
private static final ObjectMapper mapper = new ObjectMapper().configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);

/**
* This method serializes object into a json String using jackson. The object must support jackson serialization.
*/
public static String serializeToJsonString(Object object) {
if (object == null) {
return StringUtils.EMPTY;
}
FastByteArrayOutputStream baos = new FastByteArrayOutputStream();
ObjectOutputStream oos = null;
final String json;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(object);
json = mapper.writeValueAsString(object);
} catch (IOException ex) {
throw new EsHadoopSerializationException("Cannot serialize object " + object, ex);
} finally {
close(oos);
throw new EsHadoopSerializationException("Cannot serialize object: " + object, ex);
}
return DatatypeConverter.printBase64Binary(baos.bytes().bytes());
return json;
}

@SuppressWarnings("unchecked")
public static <T extends Serializable> T deserializeFromBase64(String data) {
/**
* This method deserializes a String that was created by serializeToJsonString
*/
public static <T> T deserializeFromJsonString(String data, Class<T> clazz) {
if (!StringUtils.hasLength(data)) {
return null;
}

byte[] rawData = DatatypeConverter.parseBase64Binary(data);
ObjectInputStream ois = null;
final T object;
try {
ois = new ObjectInputStream(new FastByteArrayInputStream(rawData));
Object o = ois.readObject();
return (T) o;
} catch (ClassNotFoundException ex) {
throw new EsHadoopIllegalStateException("cannot deserialize object", ex);
} catch (IOException ex) {
throw new EsHadoopSerializationException("cannot deserialize object", ex);
} finally {
close(ois);
object = mapper.readValue(data, clazz);
} catch (IOException e) {
throw new EsHadoopSerializationException("Cannot deserialize string: [" + data + "]", e);
}
return object;
}

public static String propsToString(Properties props) {
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -177,11 +176,11 @@ public static void setFilters(Settings settings, String... filters) {
return;
}

settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS, IOUtils.serializeToBase64(filters));
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS, IOUtils.serializeToJsonString(filters));
}

public static String[] getFilters(Settings settings) {
return IOUtils.deserializeFromBase64(settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS));
return IOUtils.deserializeFromJsonString(settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS), String[].class);
}

public static String determineSourceFields(Settings settings) {
25 changes: 25 additions & 0 deletions mr/src/test/java/org/elasticsearch/hadoop/util/IOUtilsTest.java
Original file line number Diff line number Diff line change
@@ -32,11 +32,19 @@
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.jar.JarFile;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.serialization.EsHadoopSerializationException;
import org.elasticsearch.hadoop.serialization.FieldType;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
import org.elasticsearch.hadoop.serialization.dto.mapping.Mapping;
import org.junit.Test;

import static junit.framework.TestCase.assertNull;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
@@ -96,6 +104,23 @@ public void testToCanonicalFileSpringBoot() throws Exception {
assertEquals("jar:" + jarWithinJarPath, canonicalFilePath);
}

@Test
public void testDeserializeFromJsonString() {
assertNull(IOUtils.deserializeFromJsonString("", String.class));
try {
IOUtils.deserializeFromJsonString("junk", String.class);
fail("Should have thrown an EsHadoopIllegalArgumentException");
} catch (EsHadoopSerializationException expected) {}
List<Field> fieldsList = new ArrayList<>();
fieldsList.add(new Field("%s", FieldType.TEXT));
Mapping mapping = new Mapping("*", "*", fieldsList);
Mapping roundTripMapping = IOUtils.deserializeFromJsonString(IOUtils.serializeToJsonString(mapping), Mapping.class);
assertEquals(mapping, roundTripMapping);
String[] filters = new String[]{"{\"exists\":{\"field\":\"id\"}}", "{\"match\":{\"id\":1}}"};
String[] roundTripFilters = IOUtils.deserializeFromJsonString(IOUtils.serializeToJsonString(filters), String[].class);
assertArrayEquals(filters, roundTripFilters);
}

/**
* This class simulates what Spring Boot's URLStreamHandler does.
*/
4 changes: 2 additions & 2 deletions pig/src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java
Original file line number Diff line number Diff line change
@@ -142,7 +142,7 @@ public void checkSchema(ResourceSchema s) throws IOException {
// save schema to back-end for JSON translation
if (!StringUtils.hasText(props.getProperty(ResourceSchema.class.getName()))) {
// save the schema as String (used JDK serialization since toString() screws up the signature - see the testcase)
props.setProperty(ResourceSchema.class.getName(), IOUtils.serializeToBase64(s));
props.setProperty(ResourceSchema.class.getName(), IOUtils.serializeToJsonString(s));
}
}

@@ -204,7 +204,7 @@ public void prepareToWrite(RecordWriter writer) throws IOException {
this.schema = new ResourceSchema();
}
else {
this.schema = IOUtils.deserializeFromBase64(s);
this.schema = IOUtils.deserializeFromJsonString(s, ResourceSchema.class);
}
this.pigTuple = new PigTuple(schema);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -209,7 +209,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
if (Utils.LOGGER.isTraceEnabled()) {
Utils.LOGGER.trace(s"Transformed filters into DSL ${filterString.mkString("[", ",", "]")}")
}
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS -> IOUtils.serializeToBase64(filterString))
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS -> IOUtils.serializeToJsonString(filterString))
}
else {
if (Utils.LOGGER.isTraceEnabled()) {
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.elasticsearch.hadoop.util.IOUtils
import org.elasticsearch.hadoop.util.TestUtils
import org.elasticsearch.hadoop.util.unit.TimeValue
import org.junit.Assert

@@ -386,11 +386,11 @@ class StreamingQueryTestHarness[S <: java.io.Serializable : Encoder](val sparkSe

object TestingSerde extends Serializable {
def serialize(any: java.io.Serializable): String = {
IOUtils.serializeToBase64(any)
TestUtils.serializeToBase64(any)
}

def deserialize[T](line: String): T = {
val data: T = IOUtils.deserializeFromBase64(line)
val data: T = TestUtils.deserializeFromBase64(line)
data
}
}
Original file line number Diff line number Diff line change
@@ -288,7 +288,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
if (Utils.LOGGER.isTraceEnabled()) {
Utils.LOGGER.trace(s"Transformed filters into DSL ${filterString.mkString("[", ",", "]")}")
}
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS -> IOUtils.serializeToBase64(filterString))
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS -> IOUtils.serializeToJsonString(filterString))
}
else {
if (Utils.LOGGER.isTraceEnabled()) {
Original file line number Diff line number Diff line change
@@ -23,14 +23,13 @@ import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.elasticsearch.hadoop.util.IOUtils
import org.elasticsearch.hadoop.util.TestUtils
import org.elasticsearch.hadoop.util.unit.TimeValue
import org.junit.Assert

@@ -386,11 +385,11 @@ class StreamingQueryTestHarness[S <: java.io.Serializable : Encoder](val sparkSe

object TestingSerde extends Serializable {
def serialize(any: java.io.Serializable): String = {
IOUtils.serializeToBase64(any)
TestUtils.serializeToBase64(any)
}

def deserialize[T](line: String): T = {
val data: T = IOUtils.deserializeFromBase64(line)
val data: T = TestUtils.deserializeFromBase64(line)
data
}
}
Original file line number Diff line number Diff line change
@@ -288,7 +288,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
if (Utils.LOGGER.isTraceEnabled()) {
Utils.LOGGER.trace(s"Transformed filters into DSL ${filterString.mkString("[", ",", "]")}")
}
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS -> IOUtils.serializeToBase64(filterString))
paramWithScan += (InternalConfigurationOptions.INTERNAL_ES_QUERY_FILTERS -> IOUtils.serializeToJsonString(filterString))
}
else {
if (Utils.LOGGER.isTraceEnabled()) {
Original file line number Diff line number Diff line change
@@ -22,13 +22,22 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Locale;

import org.apache.hadoop.conf.Configuration;
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
import org.elasticsearch.hadoop.mr.HadoopCfgUtils;
import org.elasticsearch.hadoop.rest.RestClient;
import org.elasticsearch.hadoop.serialization.EsHadoopSerializationException;

import javax.xml.bind.DatatypeConverter;

import static org.elasticsearch.hadoop.util.IOUtils.close;

public class TestUtils {

@@ -98,4 +107,42 @@ public static byte[] fromInputStream(InputStream in) throws IOException {

return out.toByteArray();
}

public static String serializeToBase64(Serializable object) {
if (object == null) {
return StringUtils.EMPTY;
}
FastByteArrayOutputStream baos = new FastByteArrayOutputStream();
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(object);
} catch (IOException ex) {
throw new EsHadoopSerializationException("Cannot serialize object " + object, ex);
} finally {
close(oos);
}
return DatatypeConverter.printBase64Binary(baos.bytes().bytes());
}

@SuppressWarnings("unchecked")
public static <T extends Serializable> T deserializeFromBase64(String data) {
if (!StringUtils.hasLength(data)) {
return null;
}

byte[] rawData = DatatypeConverter.parseBase64Binary(data);
ObjectInputStream ois = null;
try {
ois = new ObjectInputStream(new FastByteArrayInputStream(rawData));
Object o = ois.readObject();
return (T) o;
} catch (ClassNotFoundException ex) {
throw new EsHadoopIllegalStateException("cannot deserialize object", ex);
} catch (IOException ex) {
throw new EsHadoopSerializationException("cannot deserialize object", ex);
} finally {
close(ois);
}
}
}

0 comments on commit 5b4a454

Please sign in to comment.