Skip to content

Commit

Permalink
NIFI-7817 - Fix ParquetReader instantiation error (apache#4538)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvillard31 authored Sep 29, 2020
1 parent 740bfee commit fa0a1df
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
<exclude>src/test/resources/avro/user-with-nullable-array.avsc</exclude>
<exclude>src/test/resources/avro/user-with-fixed-decimal.avsc</exclude>
<exclude>src/test/resources/avro/all-minus-enum.avsc</exclude>
<exclude>src/test/resources/avro/TestParquetReader.parquet</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,11 @@ public static ParquetConfig createParquetConfig(final PropertyContext context, f
final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
parquetConfig.setWriterMode(mode);

final String compressionTypeValue = context.getProperty(ParquetUtils.COMPRESSION_TYPE).getValue();
final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
parquetConfig.setCompressionCodec(codecName);
if(context.getProperty(ParquetUtils.COMPRESSION_TYPE).isSet()) {
final String compressionTypeValue = context.getProperty(ParquetUtils.COMPRESSION_TYPE).getValue();
final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
parquetConfig.setCompressionCodec(codecName);
}

// Optional properties

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet;

import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StringUtils;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class TestParquetProcessor extends AbstractProcessor {

public static final PropertyDescriptor READER = new PropertyDescriptor.Builder()
.name("reader")
.description("reader")
.identifiesControllerService(ParquetReader.class)
.required(true)
.build();

public static final PropertyDescriptor PATH = new PropertyDescriptor.Builder()
.name("path")
.description("path")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("success")
.build();

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
final RecordReaderFactory readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class);

final List<String> records = new ArrayList<>();

byte[] parquetBytes;

// read the parquet file into bytes since we can't use a FileInputStream since it doesn't support mark/reset
try {
parquetBytes = IOUtils.toByteArray(new File(context.getProperty(PATH).getValue()).toURI());
} catch (Exception e) {
throw new ProcessException(e);
}

try (final InputStream in = new ByteArrayInputStream(parquetBytes);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
while ((record = reader.nextRecord()) != null) {
records.add(record.toString());
}
} catch (Exception e) {
throw new ProcessException(e);
}

flowFile = session.write(flowFile, (out) -> out.write(StringUtils.join(records, "\n").getBytes()));
session.transfer(flowFile, SUCCESS);
}


@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return new ArrayList<PropertyDescriptor>() {{ add(READER); add(PATH); }};
}

@Override
public Set<Relationship> getRelationships() {
return new HashSet<Relationship>() {{ add(SUCCESS); }};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.Assume;
Expand All @@ -43,10 +46,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;

public class TestParquetReader {
Expand Down Expand Up @@ -106,6 +109,26 @@ public void testReadUsers() throws IOException, MalformedRecordException {
}
}

@Test
public void testReader() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(TestParquetProcessor.class);
final String path = "src/test/resources/TestParquetReader.parquet";

final ParquetReader parquetReader = new ParquetReader();

runner.addControllerService("reader", parquetReader);
runner.enableControllerService(parquetReader);

runner.enqueue(Paths.get(path));

runner.setProperty(TestParquetProcessor.READER, "reader");
runner.setProperty(TestParquetProcessor.PATH, path);

runner.run();
runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
}


private Schema getSchema(final String schemaFilePath) throws IOException {
final File schemaFile = new File(schemaFilePath);
final String schemaString = IOUtils.toString(new FileInputStream(schemaFile), StandardCharsets.UTF_8);
Expand All @@ -124,4 +147,5 @@ private ParquetWriter<GenericRecord> createParquetWriter(final Schema schema, fi

return writer;
}

}
Binary file not shown.

0 comments on commit fa0a1df

Please sign in to comment.