diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml index 325c21e22dac..5764d9c3f865 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml @@ -107,6 +107,7 @@ src/test/resources/avro/user-with-nullable-array.avsc src/test/resources/avro/user-with-fixed-decimal.avsc src/test/resources/avro/all-minus-enum.avsc + src/test/resources/avro/TestParquetReader.parquet diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java index 0f00df8e9968..e7c1c371f2a7 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java @@ -168,9 +168,11 @@ public static ParquetConfig createParquetConfig(final PropertyContext context, f final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE; parquetConfig.setWriterMode(mode); - final String compressionTypeValue = context.getProperty(ParquetUtils.COMPRESSION_TYPE).getValue(); - final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue); - parquetConfig.setCompressionCodec(codecName); + if(context.getProperty(ParquetUtils.COMPRESSION_TYPE).isSet()) { + final String compressionTypeValue = context.getProperty(ParquetUtils.COMPRESSION_TYPE).getValue(); + final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue); + parquetConfig.setCompressionCodec(codecName); + } // Optional properties diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java new file mode 100644 index 000000000000..21a81dcfbb34 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetProcessor.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parquet; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class TestParquetProcessor extends AbstractProcessor { + + public static final PropertyDescriptor READER = new PropertyDescriptor.Builder() + .name("reader") + .description("reader") + .identifiesControllerService(ParquetReader.class) + .required(true) + .build(); + + public static final PropertyDescriptor PATH = new PropertyDescriptor.Builder() + .name("path") + .description("path") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("success") + .build(); + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + final RecordReaderFactory readerFactory = context.getProperty(READER).asControllerService(RecordReaderFactory.class); + + final List records = new ArrayList<>(); + + byte[] parquetBytes; + + // read the parquet file into bytes since we can't use a FileInputStream since it doesn't support mark/reset + try { + parquetBytes = IOUtils.toByteArray(new File(context.getProperty(PATH).getValue()).toURI()); + } catch (Exception e) { + throw new ProcessException(e); + } + + try (final InputStream in = new ByteArrayInputStream(parquetBytes); + final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { + Record record; + while ((record = reader.nextRecord()) != null) { + records.add(record.toString()); + } + } catch (Exception e) { + throw new ProcessException(e); + } + + flowFile = session.write(flowFile, (out) -> out.write(StringUtils.join(records, "\n").getBytes())); + session.transfer(flowFile, SUCCESS); + } + + + @Override + protected List getSupportedPropertyDescriptors() { + return new ArrayList() {{ add(READER); add(PATH); }}; + } + + @Override + public Set getRelationships() { + return new HashSet() {{ add(SUCCESS); }}; + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java index 1dcf721b7b7a..f0db498ce9bc 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java @@ -26,10 +26,13 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.util.MockComponentLog; import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.junit.Assume; @@ -43,10 +46,10 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; import java.util.Collections; import java.util.HashMap; import java.util.Map; - import static org.junit.Assert.assertEquals; public class TestParquetReader { @@ -106,6 +109,26 @@ public void testReadUsers() throws IOException, MalformedRecordException { } } + @Test + public void testReader() throws InitializationException, IOException { + final TestRunner runner = TestRunners.newTestRunner(TestParquetProcessor.class); + final String path = "src/test/resources/TestParquetReader.parquet"; + + final ParquetReader parquetReader = new ParquetReader(); + + runner.addControllerService("reader", parquetReader); + runner.enableControllerService(parquetReader); + + runner.enqueue(Paths.get(path)); + + runner.setProperty(TestParquetProcessor.READER, "reader"); + runner.setProperty(TestParquetProcessor.PATH, path); + + runner.run(); + runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1); + } + + private Schema getSchema(final String schemaFilePath) throws IOException { final File schemaFile = new File(schemaFilePath); final String schemaString = IOUtils.toString(new FileInputStream(schemaFile), StandardCharsets.UTF_8); @@ -124,4 +147,5 @@ private ParquetWriter createParquetWriter(final Schema schema, fi return writer; } + } diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/TestParquetReader.parquet b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/TestParquetReader.parquet new file mode 100644 index 000000000000..ef05efd3c497 Binary files /dev/null and b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/TestParquetReader.parquet differ