From 4beff13e07625d8b24119ea15d828481a4bb30d7 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 21 Aug 2017 19:55:57 +0200 Subject: [PATCH] [FLINK-4048] Remove Hadoop from DataSet API This removes all Hadoop-related methods from ExecutionEnvironment (there are already equivalent methods in flink-hadoop-compatibility (see HadoopUtils and HadoopInputs, etc.). This also removes Hadoop-specific tests from flink-tests because these are duplicated by tests in flink-hadoop-compatibility. This also removes Hadoop-specic example code from flink-examples: the DistCp example and related code. --- .../common/HadoopInputFormatCommonBase.java | 0 .../common/HadoopOutputFormatCommonBase.java | 0 .../java/hadoop/mapred/HadoopInputFormat.java | 0 .../hadoop/mapred/HadoopInputFormatBase.java | 0 .../hadoop/mapred/HadoopOutputFormat.java | 0 .../hadoop/mapred/HadoopOutputFormatBase.java | 0 .../java/hadoop/mapred/utils/HadoopUtils.java | 0 .../wrapper/HadoopDummyProgressable.java | 0 .../mapred/wrapper/HadoopDummyReporter.java | 0 .../mapred/wrapper/HadoopInputSplit.java | 0 .../hadoop/mapreduce/HadoopInputFormat.java | 0 .../mapreduce/HadoopInputFormatBase.java | 0 .../hadoop/mapreduce/HadoopOutputFormat.java | 0 .../mapreduce/HadoopOutputFormatBase.java | 0 .../hadoop/mapreduce/utils/HadoopUtils.java | 0 .../mapreduce/wrapper/HadoopInputSplit.java | 0 .../hadoop/mapred/HadoopInputFormat.scala | 2 +- .../hadoop/mapred/HadoopOutputFormat.scala | 2 +- .../hadoop/mapreduce/HadoopInputFormat.scala | 0 .../hadoop/mapreduce/HadoopOutputFormat.scala | 0 .../hadoop/mapred/HadoopInputFormatTest.java | 0 .../hadoop/mapred/HadoopOutputFormatTest.java | 6 +- .../mapreduce/HadoopInputFormatTest.java | 0 .../mapreduce/HadoopOutputFormatTest.java | 0 .../mapred/HadoopIOFormatsITCase.java | 2 +- .../mapred/WordCountMapredITCase.java | 23 ++- .../mapreduce/WordCountMapreduceITCase.java | 23 ++- .../scala}/WordCountMapredITCase.scala | 7 +- .../scala}/WordCountMapreduceITCase.scala | 11 +- .../flink/api/java/ExecutionEnvironment.java | 106 ------------- .../api/scala/ExecutionEnvironment.scala | 146 ------------------ 31 files changed, 56 insertions(+), 272 deletions(-) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java (100%) rename {flink-scala => flink-connectors/flink-hadoop-compatibility}/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala (96%) rename {flink-scala => flink-connectors/flink-hadoop-compatibility}/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala (95%) rename {flink-scala => flink-connectors/flink-hadoop-compatibility}/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala (100%) rename {flink-scala => flink-connectors/flink-hadoop-compatibility}/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java (98%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java (100%) rename {flink-java => flink-connectors/flink-hadoop-compatibility}/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java (100%) rename {flink-tests/src/test/java/org/apache/flink/test/hadoop => flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility}/mapred/HadoopIOFormatsITCase.java (99%) rename {flink-tests/src/test/java/org/apache/flink/test/hadoop => flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility}/mapred/WordCountMapredITCase.java (85%) rename {flink-tests/src/test/java/org/apache/flink/test/hadoop => flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility}/mapreduce/WordCountMapreduceITCase.java (85%) rename {flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred => flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala}/WordCountMapredITCase.scala (92%) rename {flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce => flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala}/WordCountMapreduceITCase.scala (88%) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java b/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java rename to flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala similarity index 96% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala rename to flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala index 9b614fdda3536..52a6bed48707c 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala +++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.scala.hadoop.mapred import org.apache.flink.annotation.Public import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase -import org.apache.hadoop.mapred.{JobConf, InputFormat} +import org.apache.hadoop.mapred.{InputFormat, JobConf} @Public class HadoopInputFormat[K, V]( diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala similarity index 95% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala rename to flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala index ad5f28249a006..b78665115e054 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala +++ b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.scala.hadoop.mapred import org.apache.flink.annotation.Public import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase -import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat} +import org.apache.hadoop.mapred.{JobConf, OutputCommitter, OutputFormat} @Public class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala similarity index 100% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala rename to flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala b/flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala similarity index 100% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala rename to flink-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java similarity index 100% rename from flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java rename to flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java similarity index 98% rename from flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java rename to flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java index 16d8b0847b9db..362a37f74c133 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java @@ -34,6 +34,7 @@ import org.apache.hadoop.util.Progressable; import org.junit.Test; import org.mockito.Matchers; +import org.mockito.Mockito; import java.io.IOException; @@ -41,7 +42,6 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,7 +56,7 @@ public void testOpen() throws Exception { OutputFormat dummyOutputFormat = mock(DummyOutputFormat.class); DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class); - JobConf jobConf = spy(new JobConf()); + JobConf jobConf = Mockito.spy(new JobConf()); when(jobConf.getOutputCommitter()).thenReturn(outputCommitter); HadoopOutputFormat outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf); @@ -146,7 +146,7 @@ public void testWriteRecord() throws Exception { public void testFinalizeGlobal() throws Exception { OutputFormat dummyOutputFormat = mock(DummyOutputFormat.class); DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class); - JobConf jobConf = spy(new JobConf()); + JobConf jobConf = Mockito.spy(new JobConf()); when(jobConf.getOutputCommitter()).thenReturn(outputCommitter); HadoopOutputFormat outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java similarity index 100% rename from flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java rename to flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java similarity index 100% rename from flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java rename to flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java similarity index 99% rename from flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java rename to flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java index 07b4d761656d0..bbe639553ec20 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.test.hadoop.mapred; +package org.apache.flink.test.hadoopcompatibility.mapred; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java similarity index 85% rename from flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java rename to flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java index 409bb40e0c10f..53927e7e07738 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/WordCountMapredITCase.java @@ -16,16 +16,18 @@ * limitations under the License. */ -package org.apache.flink.test.hadoop.mapred; +package org.apache.flink.test.hadoopcompatibility.mapred; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.HadoopInputs; import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.testfunctions.Tokenizer; import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; import org.apache.flink.util.OperatingSystem; import org.apache.hadoop.fs.Path; @@ -79,8 +81,8 @@ private void internalRun(boolean isTestDeprecatedAPI) throws Exception { DataSet> input; if (isTestDeprecatedAPI) { - input = env.readHadoopFile(new TextInputFormat(), - LongWritable.class, Text.class, textPath); + input = env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(), + LongWritable.class, Text.class, textPath)); } else { input = env.createInput(readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath)); @@ -118,4 +120,17 @@ public Tuple2 map(Tuple2 value) throws Exce words.output(hadoopOutputFormat); env.execute("Hadoop Compat WordCount"); } + + static final class Tokenizer implements FlatMapFunction> { + + @Override + public void flatMap(String value, Collector> out) { + String[] tokens = value.toLowerCase().split("\\W+"); + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<>(token, 1)); + } + } + } + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java similarity index 85% rename from flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java rename to flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java index 632b40676655c..be70782960fce 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/WordCountMapreduceITCase.java @@ -16,16 +16,18 @@ * limitations under the License. */ -package org.apache.flink.test.hadoop.mapreduce; +package org.apache.flink.test.hadoopcompatibility.mapreduce; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.HadoopInputs; import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.testfunctions.Tokenizer; import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; import org.apache.flink.util.OperatingSystem; import org.apache.hadoop.fs.Path; @@ -78,8 +80,8 @@ private void internalRun(boolean isTestDeprecatedAPI) throws Exception { DataSet> input; if (isTestDeprecatedAPI) { - input = env.readHadoopFile(new TextInputFormat(), - LongWritable.class, Text.class, textPath); + input = env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(), + LongWritable.class, Text.class, textPath)); } else { input = env.createInput(readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath)); @@ -118,4 +120,17 @@ public Tuple2 map(Tuple2 value) throws Exce words.output(hadoopOutputFormat); env.execute("Hadoop Compat WordCount"); } + + static final class Tokenizer implements FlatMapFunction> { + + @Override + public void flatMap(String value, Collector> out) { + String[] tokens = value.toLowerCase().split("\\W+"); + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<>(token, 1)); + } + } + } + } } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala similarity index 92% rename from flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala rename to flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala index e7069e1b1fa81..5aaf3799a84ba 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala +++ b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapredITCase.scala @@ -15,9 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.api.scala.hadoop.mapred +package org.apache.flink.api.hadoopcompatibility.scala import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat import org.apache.flink.hadoopcompatibility.scala.HadoopInputs import org.apache.flink.test.testdata.WordCountData import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils} @@ -52,7 +53,9 @@ class WordCountMapredITCase extends JavaProgramTestBase { val input = if (testDeprecatedAPI) { - env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + env.createInput( + HadoopInputs.readHadoopFile( + new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)) } else { env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala similarity index 88% rename from flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala rename to flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala index c3031e949d7c9..61e7a12c7e443 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala +++ b/flink-connectors/flink-hadoop-compatibility/src/test/scala/org/apache/flink/api/hadoopcompatibility/scala/WordCountMapreduceITCase.scala @@ -16,15 +16,16 @@ * limitations under the License. */ -package org.apache.flink.api.scala.hadoop.mapreduce +package org.apache.flink.api.hadoopcompatibility.scala import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat import org.apache.flink.hadoopcompatibility.scala.HadoopInputs import org.apache.flink.test.testdata.WordCountData -import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase} +import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils} import org.apache.flink.util.OperatingSystem import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.{Text, LongWritable} +import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} @@ -63,7 +64,9 @@ class WordCountMapreduceITCase extends JavaProgramTestBase { val input = if (testDeprecatedAPI) { - env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + env.createInput( + HadoopInputs.readHadoopFile( + new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)) } else { env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 089c90b458853..02acfc601a7dc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -33,7 +33,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; import org.apache.flink.api.java.io.CollectionInputFormat; import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.io.IteratorInputFormat; @@ -61,8 +60,6 @@ import org.apache.flink.util.Visitor; import com.esotericsoftware.kryo.Serializer; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -582,109 +579,6 @@ public DataSource createInput(InputFormat inputFormat, TypeInformat return new DataSource<>(this, inputFormat, producedType, Utils.getCallLocationName()); } - // ----------------------------------- Hadoop Input Format --------------------------------------- - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. - * - * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat, Class, Class, String, JobConf)} - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath, JobConf job) { - DataSource> result = createHadoopInput(mapredInputFormat, key, value, job); - - org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); - - return result; - } - - /** - * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat} - * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. - * - * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class, Class, String)} - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - public DataSource> readSequenceFile(Class key, Class value, String inputPath) throws IOException { - return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat(), key, value, inputPath); - } - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A - * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. - * - * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat, Class, Class, String)} - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath) { - return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf()); - } - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}. - * - * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat, Class, Class, JobConf)} - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - public DataSource> createHadoopInput(org.apache.hadoop.mapred.InputFormat mapredInputFormat, Class key, Class value, JobConf job) { - HadoopInputFormat hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job); - - return this.createInput(hadoopInputFormat); - } - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The - * given inputName is set on the given job. - * - * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat, Class, Class, String, Job)} - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath, Job job) throws IOException { - DataSource> result = createHadoopInput(mapreduceInputFormat, key, value, job); - - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache - .hadoop.fs.Path(inputPath)); - - return result; - } - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A - * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created. - * - * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat, Class, Class, String)} - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath) throws IOException { - return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance()); - } - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}. - * - * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat, Class, Class, Job)} - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - public DataSource> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat mapreduceInputFormat, Class key, Class value, Job job) { - org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job); - - return this.createInput(hadoopInputFormat); - } - // ----------------------------------- Collection --------------------------------------- /** diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 6d70438ba1c4c..bc24ad0b58d8a 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -29,15 +29,10 @@ import org.apache.flink.api.java.operators.DataSource import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo} import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv} -import org.apache.flink.api.scala.hadoop.{mapred, mapreduce} import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.Path import org.apache.flink.types.StringValue import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator} -import org.apache.hadoop.fs.{Path => HadoopPath} -import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat, JobConf} -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} -import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job} import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -409,147 +404,6 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { wrap(new DataSource[T](javaEnv, inputFormat, producedType, getCallLocationName())) } - /** - * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The - * given inputName is set on the given job. - * - * @deprecated Please use - * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]] - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - def readHadoopFile[K, V]( - mapredInputFormat: MapredFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String, - job: JobConf) - (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { - val result = createHadoopInput(mapredInputFormat, key, value, job) - MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) - result - } - - /** - * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A - * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created. - * - * @deprecated Please use - * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]] - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - def readHadoopFile[K, V]( - mapredInputFormat: MapredFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String) - (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { - readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf) - } - - /** - * Creates a [[DataSet]] from [[org.apache.hadoop.mapred.SequenceFileInputFormat]] - * A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created. - * - * @deprecated Please use - * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readSequenceFile]] - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - def readSequenceFile[K, V]( - key: Class[K], - value: Class[V], - inputPath: String) - (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { - readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V], - key, value, inputPath) - } - - /** - * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]]. - * - * @deprecated Please use - * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]] - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - def createHadoopInput[K, V]( - mapredInputFormat: MapredInputFormat[K, V], - key: Class[K], - value: Class[V], - job: JobConf) - (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { - val hadoopInputFormat = new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job) - createInput(hadoopInputFormat) - } - - /** - * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. - * The given inputName is set on the given job. - * - * @deprecated Please use - * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]] - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - def readHadoopFile[K, V]( - mapreduceInputFormat: MapreduceFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String, - job: Job) - (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { - val result = createHadoopInput(mapreduceInputFormat, key, value, job) - MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) - result - } - - /** - * Creates a [[DataSet]] from the given - * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A - * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created. - * - * @deprecated Please use - * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#readHadoopFile]] - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - def readHadoopFile[K, V]( - mapreduceInputFormat: MapreduceFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String) - (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = { - readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance) - } - - /** - * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]]. - * - * @deprecated Please use - * [[org.apache.flink.hadoopcompatibility.scala.HadoopInputs#createHadoopInput]] - * from the flink-hadoop-compatibility module. - */ - @Deprecated - @PublicEvolving - def createHadoopInput[K, V]( - mapreduceInputFormat: MapreduceInputFormat[K, V], - key: Class[K], - value: Class[V], - job: Job) - (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = { - val hadoopInputFormat = - new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job) - createInput(hadoopInputFormat) - } - /** * Creates a DataSet from the given non-empty [[Iterable]]. *