Skip to content

Commit

Permalink
[FLINK-35359][runtime] Remove logic to instantiate BlockCompressionFa…
Browse files Browse the repository at this point in the history
…ctory from user-defined class name

There are no use internally to instantiate the BlockCompressionFactory from class name. Even though, it is possible for user to pass a user-defined class name to BlockCompressionFactory, this ability is never exposed in any public documentation.
  • Loading branch information
Sxnan authored and xintongsong committed May 23, 2024
1 parent 583aadf commit 58f0813
Showing 1 changed file with 17 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.runtime.io.compression;

import org.apache.flink.configuration.IllegalConfigurationException;

import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
import io.airlift.compress.zstd.ZstdCompressor;
Expand Down Expand Up @@ -47,58 +45,32 @@ enum CompressionFactoryName {
/**
* Creates {@link BlockCompressionFactory} according to the configuration.
*
* @param compressionFactoryName supported compression codecs or user-defined class name
* inherited from {@link BlockCompressionFactory}.
* @param compressionFactoryName supported compression codecs.
*/
static BlockCompressionFactory createBlockCompressionFactory(String compressionFactoryName) {

checkNotNull(compressionFactoryName);

CompressionFactoryName compressionName;
try {
compressionName = CompressionFactoryName.valueOf(compressionFactoryName.toUpperCase());
} catch (IllegalArgumentException e) {
compressionName = null;
}
CompressionFactoryName compressionName =
CompressionFactoryName.valueOf(compressionFactoryName.toUpperCase());

BlockCompressionFactory blockCompressionFactory;
if (compressionName != null) {
switch (compressionName) {
case LZ4:
blockCompressionFactory = new Lz4BlockCompressionFactory();
break;
case LZO:
blockCompressionFactory =
new AirCompressorFactory(new LzoCompressor(), new LzoDecompressor());
break;
case ZSTD:
blockCompressionFactory =
new AirCompressorFactory(new ZstdCompressor(), new ZstdDecompressor());
break;
default:
throw new IllegalStateException("Unknown CompressionMethod " + compressionName);
}
} else {
Object factoryObj;
try {
factoryObj = Class.forName(compressionFactoryName).newInstance();
} catch (ClassNotFoundException e) {
throw new IllegalConfigurationException(
"Cannot load class " + compressionFactoryName, e);
} catch (Exception e) {
throw new IllegalConfigurationException(
"Cannot create object for class " + compressionFactoryName, e);
}
if (factoryObj instanceof BlockCompressionFactory) {
blockCompressionFactory = (BlockCompressionFactory) factoryObj;
} else {
throw new IllegalArgumentException(
"CompressionFactoryName should inherit from"
+ " interface BlockCompressionFactory, or use the default compression codec.");
}
switch (compressionName) {
case LZ4:
blockCompressionFactory = new Lz4BlockCompressionFactory();
break;
case LZO:
blockCompressionFactory =
new AirCompressorFactory(new LzoCompressor(), new LzoDecompressor());
break;
case ZSTD:
blockCompressionFactory =
new AirCompressorFactory(new ZstdCompressor(), new ZstdDecompressor());
break;
default:
throw new IllegalStateException("Unknown CompressionMethod " + compressionName);
}

checkNotNull(blockCompressionFactory);
return blockCompressionFactory;
}
}

0 comments on commit 58f0813

Please sign in to comment.