From a89879f81e5d2e7a365ad030fc909dbd50abe68e Mon Sep 17 00:00:00 2001 From: brucearctor <5032356+brucearctor@users.noreply.github.com> Date: Mon, 19 Apr 2021 02:54:28 -0700 Subject: [PATCH] [BEAM-8611] Move TextSourceTest into TextIOReadTest (#14560) * [BEAM-8611] Move TextSourceTest into TextIOReadTest * fixed checks and imports --- .../apache/beam/sdk/io/TextIOReadTest.java | 122 ++++++++++++++ .../apache/beam/sdk/io/TextSourceTest.java | 158 ------------------ 2 files changed, 122 insertions(+), 158 deletions(-) delete mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index f51938b62a1b..b113dac40d3f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -39,12 +39,14 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeFalse; +import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; import java.io.Writer; +import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -66,6 +68,9 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -851,4 +856,121 @@ public void testReadWatchForNewFiles() throws IOException, InterruptedException p.run(); } } + + /** Tests for TextSource class. */ + @RunWith(JUnit4.class) + public static class TextSourceTest { + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Test + @Category(NeedsRunner.class) + public void testRemoveUtf8BOM() throws Exception { + Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1"); + Path p2 = + createTestFile( + "test_txt_utf8_no_bom", + Charset.forName("UTF-8"), + "1,p2-Japanese:テスト", + "2,p2-Japanese:テスト"); + Path p3 = + createTestFile( + "test_txt_utf8_bom", + Charset.forName("UTF-8"), + "\uFEFF1,p3-テストBOM", + "\uFEFF2,p3-テストBOM"); + PCollection contents = + pipeline + .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString())) + .setCoder(StringUtf8Coder.of()) + // PCollection + .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform()); + // PCollection>: tableName, line + + // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed. + PAssert.that(contents) + .containsInAnyOrder( + "1,p1", + "2,p1", + "1,p2-Japanese:テスト", + "2,p2-Japanese:テスト", + "1,p3-テストBOM", + "\uFEFF2,p3-テストBOM"); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testPreserveNonBOMBytes() throws Exception { + // Contains \uFEFE, not UTF BOM. + Path p1 = + createTestFile( + "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト"); + PCollection contents = + pipeline + .apply("Create", Create.of(p1.toString())) + .setCoder(StringUtf8Coder.of()) + // PCollection + .apply("Read file", new TextIOReadTest.TextSourceTest.TextFileReadTransform()); + + PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト"); + + pipeline.run(); + } + + private static class FileReadDoFn extends DoFn { + + @ProcessElement + public void processElement(ProcessContext c) { + FileIO.ReadableFile file = c.element(); + ValueProvider filenameProvider = + ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename()); + // Create a TextSource, passing null as the delimiter to use the default + // delimiters ('\n', '\r', or '\r\n'). + TextSource textSource = new TextSource(filenameProvider, null, null); + try { + BoundedSource.BoundedReader reader = + textSource + .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes()) + .createReader(c.getPipelineOptions()); + for (boolean more = reader.start(); more; more = reader.advance()) { + c.output(reader.getCurrent()); + } + } catch (IOException e) { + throw new RuntimeException( + "Unable to readFile: " + file.getMetadata().resourceId().toString()); + } + } + } + + /** A transform that reads CSV file records. */ + private static class TextFileReadTransform + extends PTransform, PCollection> { + public TextFileReadTransform() {} + + @Override + public PCollection expand(PCollection files) { + return files + // PCollection + .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)) + // PCollection + .apply(FileIO.readMatches()) + // PCollection + .apply("Read lines", ParDo.of(new TextIOReadTest.TextSourceTest.FileReadDoFn())); + // PCollection: line + } + } + + private Path createTestFile(String filename, Charset charset, String... lines) + throws IOException { + Path path = Files.createTempFile(filename, ".csv"); + try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) { + for (String line : lines) { + writer.write(line); + writer.write('\n'); + } + } + return path; + } + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java deleted file mode 100644 index 36a3f6804c88..000000000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import java.io.BufferedWriter; -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Path; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.FileIO.ReadableFile; -import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for TextSource class. */ -@RunWith(JUnit4.class) -public class TextSourceTest { - @Rule public transient TestPipeline pipeline = TestPipeline.create(); - - @Test - @Category(NeedsRunner.class) - public void testRemoveUtf8BOM() throws Exception { - Path p1 = createTestFile("test_txt_ascii", Charset.forName("US-ASCII"), "1,p1", "2,p1"); - Path p2 = - createTestFile( - "test_txt_utf8_no_bom", - Charset.forName("UTF-8"), - "1,p2-Japanese:テスト", - "2,p2-Japanese:テスト"); - Path p3 = - createTestFile( - "test_txt_utf8_bom", - Charset.forName("UTF-8"), - "\uFEFF1,p3-テストBOM", - "\uFEFF2,p3-テストBOM"); - PCollection contents = - pipeline - .apply("Create", Create.of(p1.toString(), p2.toString(), p3.toString())) - .setCoder(StringUtf8Coder.of()) - // PCollection - .apply("Read file", new TextFileReadTransform()); - // PCollection>: tableName, line - - // Validate that the BOM bytes (\uFEFF) at the beginning of the first line have been removed. - PAssert.that(contents) - .containsInAnyOrder( - "1,p1", - "2,p1", - "1,p2-Japanese:テスト", - "2,p2-Japanese:テスト", - "1,p3-テストBOM", - "\uFEFF2,p3-テストBOM"); - - pipeline.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testPreserveNonBOMBytes() throws Exception { - // Contains \uFEFE, not UTF BOM. - Path p1 = - createTestFile( - "test_txt_utf_bom", Charset.forName("UTF-8"), "\uFEFE1,p1テスト", "\uFEFE2,p1テスト"); - PCollection contents = - pipeline - .apply("Create", Create.of(p1.toString())) - .setCoder(StringUtf8Coder.of()) - // PCollection - .apply("Read file", new TextFileReadTransform()); - - PAssert.that(contents).containsInAnyOrder("\uFEFE1,p1テスト", "\uFEFE2,p1テスト"); - - pipeline.run(); - } - - private static class FileReadDoFn extends DoFn { - - @ProcessElement - public void processElement(ProcessContext c) { - ReadableFile file = c.element(); - ValueProvider filenameProvider = - ValueProvider.StaticValueProvider.of(file.getMetadata().resourceId().getFilename()); - // Create a TextSource, passing null as the delimiter to use the default - // delimiters ('\n', '\r', or '\r\n'). - TextSource textSource = new TextSource(filenameProvider, null, null); - try { - BoundedSource.BoundedReader reader = - textSource - .createForSubrangeOfFile(file.getMetadata(), 0, file.getMetadata().sizeBytes()) - .createReader(c.getPipelineOptions()); - for (boolean more = reader.start(); more; more = reader.advance()) { - c.output(reader.getCurrent()); - } - } catch (IOException e) { - throw new RuntimeException( - "Unable to readFile: " + file.getMetadata().resourceId().toString()); - } - } - } - - /** A transform that reads CSV file records. */ - private static class TextFileReadTransform - extends PTransform, PCollection> { - public TextFileReadTransform() {} - - @Override - public PCollection expand(PCollection files) { - return files - // PCollection - .apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)) - // PCollection - .apply(FileIO.readMatches()) - // PCollection - .apply("Read lines", ParDo.of(new FileReadDoFn())); - // PCollection: line - } - } - - private Path createTestFile(String filename, Charset charset, String... lines) - throws IOException { - Path path = Files.createTempFile(filename, ".csv"); - try (BufferedWriter writer = Files.newBufferedWriter(path, charset)) { - for (String line : lines) { - writer.write(line); - writer.write('\n'); - } - } - return path; - } -}