Skip to content

Commit

Permalink
Correctly and consistent handle finish encoding for compression (nett…
Browse files Browse the repository at this point in the history
…y#13173)

Motivation:

We never cancelled the schedule close operation which could lead to exceptions and extra work.

Modifications:

- Move the logic for scheduling the close to an extra static method and reuse
- Ensure cancellation is done if the close is not needed anymore

Result:

Correctly handle closing and cancellation when doing compression
  • Loading branch information
normanmaurer authored Feb 2, 2023
1 parent 8a8337e commit f18d8f5
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ final class Bzip2Constants {
static final int MIN_BLOCK_SIZE = 1;
static final int MAX_BLOCK_SIZE = 9;

static final int THREAD_POOL_DELAY_SECONDS = 10;

static final int MAX_BLOCK_LENGTH = MAX_BLOCK_SIZE * BASE_BLOCK_SIZE;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,19 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.PromiseNotifier;

import java.util.concurrent.TimeUnit;

import static io.netty.handler.codec.compression.Bzip2Constants.BASE_BLOCK_SIZE;
import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_1;
import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_2;
import static io.netty.handler.codec.compression.Bzip2Constants.MAGIC_NUMBER;
import static io.netty.handler.codec.compression.Bzip2Constants.MAX_BLOCK_SIZE;
import static io.netty.handler.codec.compression.Bzip2Constants.MIN_BLOCK_SIZE;
import static io.netty.handler.codec.compression.Bzip2Constants.THREAD_POOL_DELAY_SECONDS;

/**
* Compresses a {@link ByteBuf} using the Bzip2 algorithm.
Expand Down Expand Up @@ -204,22 +200,7 @@ public void run() {
@Override
public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
ChannelFuture f = finishEncode(ctx, ctx.newPromise());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
ctx.close(promise);
}
});

if (!f.isDone()) {
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.close(promise);
}
}, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
}
EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
}

private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2023 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.compression;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

final class EncoderUtil {
private static final int THREAD_POOL_DELAY_SECONDS = 10;

static void closeAfterFinishEncode(final ChannelHandlerContext ctx, final ChannelFuture finishFuture,
final ChannelPromise promise) {
if (!finishFuture.isDone()) {
// Ensure the channel is closed even if the write operation completes in time.
final Future<?> future = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.close(promise);
}
}, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);

finishFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
// Cancel the scheduled timeout.
future.cancel(true);
if (!promise.isDone()) {
ctx.close(promise);
}
}
});
} else {
ctx.close(promise);
}
}

private EncoderUtil() { }
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.PromiseNotifier;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.ObjectUtil;
Expand Down Expand Up @@ -324,21 +325,30 @@ public void close(
final ChannelHandlerContext ctx,
final ChannelPromise promise) {
ChannelFuture f = finishEncode(ctx, ctx.newPromise());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
ctx.close(promise);
}
});

if (!f.isDone()) {
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(new Runnable() {
final Future<?> future = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.close(promise);
if (!promise.isDone()) {
ctx.close(promise);
}
}
}, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);

f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) {
// Cancel the scheduled timeout.
future.cancel(true);
if (!promise.isDone()) {
ctx.close(promise);
}
}
});
} else {
ctx.close(promise);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
Expand All @@ -30,7 +29,6 @@
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.Deflater;

Expand Down Expand Up @@ -62,7 +60,6 @@ public class JdkZlibEncoder extends ZlibEncoder {
private final CRC32 crc = new CRC32();
private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
private boolean writeHeader = true;
private static final int THREAD_POOL_DELAY_SECONDS = 10;

static {
MAX_INITIAL_OUTPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
Expand Down Expand Up @@ -304,22 +301,7 @@ protected final ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg,
@Override
public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
ChannelFuture f = finishEncode(ctx, ctx.newPromise());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
ctx.close(promise);
}
});

if (!f.isDone()) {
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.close(promise);
}
}, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
}
EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
}

private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ final class Lz4Constants {
*/
static final int TOKEN_OFFSET = 8;

static final int THREAD_POOL_DELAY_SECONDS = 10;

static final int COMPRESSED_LENGTH_OFFSET = TOKEN_OFFSET + 1;
static final int DECOMPRESSED_LENGTH_OFFSET = COMPRESSED_LENGTH_OFFSET + 4;
static final int CHECKSUM_OFFSET = DECOMPRESSED_LENGTH_OFFSET + 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
Expand All @@ -33,7 +32,6 @@
import net.jpountz.lz4.LZ4Factory;

import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.zip.Checksum;

import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
Expand All @@ -49,7 +47,6 @@
import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
import static io.netty.handler.codec.compression.Lz4Constants.MIN_BLOCK_SIZE;
import static io.netty.handler.codec.compression.Lz4Constants.TOKEN_OFFSET;
import static io.netty.handler.codec.compression.Lz4Constants.THREAD_POOL_DELAY_SECONDS;

/**
* Compresses a {@link ByteBuf} using the LZ4 format.
Expand Down Expand Up @@ -370,22 +367,8 @@ public void run() {
@Override
public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
ChannelFuture f = finishEncode(ctx, ctx.newPromise());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
ctx.close(promise);
}
});

if (!f.isDone()) {
// Ensure the channel is closed even if the write operation completes in time.
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.close(promise);
}
}, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
}
EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
}

private ChannelHandlerContext ctx() {
Expand Down

0 comments on commit f18d8f5

Please sign in to comment.