Skip to content

Commit

Permalink
Provide a flag to ignore Json format error in pulsar flink connector (a…
Browse files Browse the repository at this point in the history
…pache#3210)

### Motivation

when using flink and pulsar, a Json format error will cause the flink program to fail

### Modifications

provide a flag in pulsar flink connector to ignore json format errors
  • Loading branch information
CrestOfWave authored and sijie committed Dec 19, 2018
1 parent 8da04cb commit 00ae3cf
Showing 1 changed file with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,30 @@
*/
public class JsonRowDeserializationSchema implements DeserializationSchema<Row> {

/*
What to do when detecting that a json line cannot be deserialized :
(1).false : Throw A IOException and Terminate application。
(2).true : Ignore the error line and add a null line。
*/
private boolean ignoreJsonFormatError = false;


/**
*
* @return true or false
*/
public boolean getIgnoreJsonFormatError() {
return ignoreJsonFormatError;
}

/**
* set ignoreJsonFormatError
* @param ignoreJsonFormatError
*/
public void setIgnoreJsonFormatError(boolean ignoreJsonFormatError) {
this.ignoreJsonFormatError = ignoreJsonFormatError;
}

/**
* Type information describing the result type.
*/
Expand Down Expand Up @@ -102,7 +126,13 @@ public Row deserialize(byte[] message) throws IOException {

return row;
} catch (Throwable t) {
throw new IOException("Failed to deserialize JSON object.", t);
if (ignoreJsonFormatError) {
final int arity = typeInfo.getArity();
final Object[] nullsArray = new Object[arity];
return Row.of(nullsArray);
} else {
throw new IOException("Failed to deserialize JSON object.", t);
}
}
}

Expand Down

0 comments on commit 00ae3cf

Please sign in to comment.