From f18d8f53277ba0d9aa3fedd845cae5a6c7d77392 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 2 Feb 2023 10:34:23 +0100 Subject: [PATCH] Correctly and consistent handle finish encoding for compression (#13173) Motivation: We never cancelled the schedule close operation which could lead to exceptions and extra work. Modifications: - Move the logic for scheduling the close to an extra static method and reuse - Ensure cancellation is done if the close is not needed anymore Result: Correctly handle closing and cancellation when doing compression --- .../codec/compression/Bzip2Constants.java | 2 - .../codec/compression/Bzip2Encoder.java | 21 +------ .../codec/compression/EncoderUtil.java | 57 +++++++++++++++++++ .../codec/compression/JZlibEncoder.java | 26 ++++++--- .../codec/compression/JdkZlibEncoder.java | 20 +------ .../codec/compression/Lz4Constants.java | 2 - .../codec/compression/Lz4FrameEncoder.java | 19 +------ 7 files changed, 78 insertions(+), 69 deletions(-) create mode 100644 codec/src/main/java/io/netty/handler/codec/compression/EncoderUtil.java diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Constants.java b/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Constants.java index 90432cab1385..087f45faa0b2 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Constants.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Constants.java @@ -49,8 +49,6 @@ final class Bzip2Constants { static final int MIN_BLOCK_SIZE = 1; static final int MAX_BLOCK_SIZE = 9; - static final int THREAD_POOL_DELAY_SECONDS = 10; - static final int MAX_BLOCK_LENGTH = MAX_BLOCK_SIZE * BASE_BLOCK_SIZE; /** diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java index f5cba14998f3..8b42662d664a 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Bzip2Encoder.java @@ -17,7 +17,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; @@ -25,15 +24,12 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.PromiseNotifier; -import java.util.concurrent.TimeUnit; - import static io.netty.handler.codec.compression.Bzip2Constants.BASE_BLOCK_SIZE; import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_1; import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_2; import static io.netty.handler.codec.compression.Bzip2Constants.MAGIC_NUMBER; import static io.netty.handler.codec.compression.Bzip2Constants.MAX_BLOCK_SIZE; import static io.netty.handler.codec.compression.Bzip2Constants.MIN_BLOCK_SIZE; -import static io.netty.handler.codec.compression.Bzip2Constants.THREAD_POOL_DELAY_SECONDS; /** * Compresses a {@link ByteBuf} using the Bzip2 algorithm. @@ -204,22 +200,7 @@ public void run() { @Override public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { ChannelFuture f = finishEncode(ctx, ctx.newPromise()); - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) throws Exception { - ctx.close(promise); - } - }); - - if (!f.isDone()) { - // Ensure the channel is closed even if the write operation completes in time. - ctx.executor().schedule(new Runnable() { - @Override - public void run() { - ctx.close(promise); - } - }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS); - } + EncoderUtil.closeAfterFinishEncode(ctx, f, promise); } private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) { diff --git a/codec/src/main/java/io/netty/handler/codec/compression/EncoderUtil.java b/codec/src/main/java/io/netty/handler/codec/compression/EncoderUtil.java new file mode 100644 index 000000000000..477fdfd36f8a --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/compression/EncoderUtil.java @@ -0,0 +1,57 @@ +/* + * Copyright 2023 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.compression; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.util.concurrent.Future; + +import java.util.concurrent.TimeUnit; + +final class EncoderUtil { + private static final int THREAD_POOL_DELAY_SECONDS = 10; + + static void closeAfterFinishEncode(final ChannelHandlerContext ctx, final ChannelFuture finishFuture, + final ChannelPromise promise) { + if (!finishFuture.isDone()) { + // Ensure the channel is closed even if the write operation completes in time. + final Future future = ctx.executor().schedule(new Runnable() { + @Override + public void run() { + ctx.close(promise); + } + }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS); + + finishFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) { + // Cancel the scheduled timeout. + future.cancel(true); + if (!promise.isDone()) { + ctx.close(promise); + } + } + }); + } else { + ctx.close(promise); + } + } + + private EncoderUtil() { } +} + diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java index 418a12a1894c..d94c030cfa57 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; import io.netty.util.concurrent.PromiseNotifier; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.ObjectUtil; @@ -324,21 +325,30 @@ public void close( final ChannelHandlerContext ctx, final ChannelPromise promise) { ChannelFuture f = finishEncode(ctx, ctx.newPromise()); - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) throws Exception { - ctx.close(promise); - } - }); if (!f.isDone()) { // Ensure the channel is closed even if the write operation completes in time. - ctx.executor().schedule(new Runnable() { + final Future future = ctx.executor().schedule(new Runnable() { @Override public void run() { - ctx.close(promise); + if (!promise.isDone()) { + ctx.close(promise); + } } }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS); + + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) { + // Cancel the scheduled timeout. + future.cancel(true); + if (!promise.isDone()) { + ctx.close(promise); + } + } + }); + } else { + ctx.close(promise); } } diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index 981ab7c40340..e43f6d561e37 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -17,7 +17,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.EventExecutor; @@ -30,7 +29,6 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.concurrent.TimeUnit; import java.util.zip.CRC32; import java.util.zip.Deflater; @@ -62,7 +60,6 @@ public class JdkZlibEncoder extends ZlibEncoder { private final CRC32 crc = new CRC32(); private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0}; private boolean writeHeader = true; - private static final int THREAD_POOL_DELAY_SECONDS = 10; static { MAX_INITIAL_OUTPUT_BUFFER_SIZE = SystemPropertyUtil.getInt( @@ -304,22 +301,7 @@ protected final ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, @Override public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { ChannelFuture f = finishEncode(ctx, ctx.newPromise()); - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) throws Exception { - ctx.close(promise); - } - }); - - if (!f.isDone()) { - // Ensure the channel is closed even if the write operation completes in time. - ctx.executor().schedule(new Runnable() { - @Override - public void run() { - ctx.close(promise); - } - }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS); - } + EncoderUtil.closeAfterFinishEncode(ctx, f, promise); } private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) { diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Lz4Constants.java b/codec/src/main/java/io/netty/handler/codec/compression/Lz4Constants.java index dc6802804b8d..3757cbcb4209 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Lz4Constants.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Lz4Constants.java @@ -42,8 +42,6 @@ final class Lz4Constants { */ static final int TOKEN_OFFSET = 8; - static final int THREAD_POOL_DELAY_SECONDS = 10; - static final int COMPRESSED_LENGTH_OFFSET = TOKEN_OFFSET + 1; static final int DECOMPRESSED_LENGTH_OFFSET = COMPRESSED_LENGTH_OFFSET + 4; static final int CHECKSUM_OFFSET = DECOMPRESSED_LENGTH_OFFSET + 4; diff --git a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java index 6880d2f26450..ff95e7148cc2 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/Lz4FrameEncoder.java @@ -19,7 +19,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; @@ -33,7 +32,6 @@ import net.jpountz.lz4.LZ4Factory; import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; import java.util.zip.Checksum; import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED; @@ -49,7 +47,6 @@ import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE; import static io.netty.handler.codec.compression.Lz4Constants.MIN_BLOCK_SIZE; import static io.netty.handler.codec.compression.Lz4Constants.TOKEN_OFFSET; -import static io.netty.handler.codec.compression.Lz4Constants.THREAD_POOL_DELAY_SECONDS; /** * Compresses a {@link ByteBuf} using the LZ4 format. @@ -370,22 +367,8 @@ public void run() { @Override public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { ChannelFuture f = finishEncode(ctx, ctx.newPromise()); - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) throws Exception { - ctx.close(promise); - } - }); - if (!f.isDone()) { - // Ensure the channel is closed even if the write operation completes in time. - ctx.executor().schedule(new Runnable() { - @Override - public void run() { - ctx.close(promise); - } - }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS); - } + EncoderUtil.closeAfterFinishEncode(ctx, f, promise); } private ChannelHandlerContext ctx() {