Skip to content

Commit

Permalink
compression: allow setting level per algorithm
Browse files Browse the repository at this point in the history
Fixes cloudflare#228

This adds a function to set the compression level per supported algorithm. The
behavior of `adjust_level` is changed to set the level for all of the algorithms
such that it still behaves the same.
  • Loading branch information
gumpt committed Jun 21, 2024
1 parent 9500e62 commit c943fc1
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
afbee799f578048eb9f05afc0abc259ba1d0f40a
9392462069291836a53077efe479d3aa9f2072bb
2 changes: 2 additions & 0 deletions pingora-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ clap = { version = "3.2.25", features = ["derive"] }
once_cell = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8"
strum = "0.26.2"
strum_macros = "0.26.2"
libc = "0.2.70"
chrono = { version = "~0.4.31", features = ["alloc"], default-features = false }
thread_local = "1.0"
Expand Down
57 changes: 40 additions & 17 deletions pingora-core/src/protocols/http/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use pingora_error::{ErrorType, Result};
use pingora_http::{RequestHeader, ResponseHeader};
use std::time::Duration;

use strum::EnumCount;
use strum_macros::EnumCount as EnumCountMacro;

mod brotli;
mod gzip;
mod zstd;
Expand All @@ -50,7 +53,7 @@ pub trait Encode {
/// The caller should call the corresponding filters for the request header, response header and
/// response body. If the algorithms are supported, the output response body will be encoded.
/// The response header will be adjusted accordingly as well. If the algorithm is not supported
/// or no encoding needed, the response is untouched.
/// or no encoding is needed, the response is untouched.
///
/// If configured and if the request's `accept-encoding` header contains the algorithm supported and the
/// incoming response doesn't have that encoding, the filter will compress the response.
Expand All @@ -64,23 +67,23 @@ pub struct ResponseCompressionCtx(CtxInner);

enum CtxInner {
HeaderPhase {
compression_level: u32,
decompress_enable: bool,
// Store the preferred list to compare with content-encoding
accept_encoding: Vec<Algorithm>,
encoding_levels: [u32; Algorithm::COUNT],
},
BodyPhase(Option<Box<dyn Encode + Send + Sync>>),
}

impl ResponseCompressionCtx {
/// Create a new [`ResponseCompressionCtx`] with the expected compression level. `0` will disable
/// the compression.
/// the compression. The compression level is applied across all algorithms.
/// The `decompress_enable` flag will tell the ctx to decompress if needed.
pub fn new(compression_level: u32, decompress_enable: bool) -> Self {
Self(CtxInner::HeaderPhase {
compression_level,
decompress_enable,
accept_encoding: Vec::new(),
encoding_levels: [compression_level; Algorithm::COUNT],
})
}

Expand All @@ -89,10 +92,10 @@ impl ResponseCompressionCtx {
pub fn is_enabled(&self) -> bool {
match &self.0 {
CtxInner::HeaderPhase {
compression_level,
decompress_enable,
accept_encoding: _,
} => *compression_level != 0 || *decompress_enable,
encoding_levels: levels,
} => levels.iter().any(|l| *l != 0) || *decompress_enable,
CtxInner::BodyPhase(c) => c.is_some(),
}
}
Expand All @@ -102,25 +105,41 @@ impl ResponseCompressionCtx {
pub fn get_info(&self) -> Option<(&'static str, usize, usize, Duration)> {
match &self.0 {
CtxInner::HeaderPhase {
compression_level: _,
decompress_enable: _,
accept_encoding: _,
encoding_levels: _,
} => None,
CtxInner::BodyPhase(c) => c.as_ref().map(|c| c.stat()),
}
}

/// Adjust the compression level.
/// Adjust the compression level for all compression algorithms.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_level(&mut self, new_level: u32) {
match &mut self.0 {
CtxInner::HeaderPhase {
compression_level,
decompress_enable: _,
accept_encoding: _,
encoding_levels: levels,
} => {
*levels = [new_level; Algorithm::COUNT];
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
}

/// Adjust the compression level for a specific algorithm.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_algorithm_level(&mut self, algorithm: Algorithm, new_level: u32) {
match &mut self.0 {
CtxInner::HeaderPhase {
decompress_enable: _,
accept_encoding: _,
encoding_levels: levels,
} => {
*compression_level = new_level;
levels[algorithm.index()] = new_level;
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
Expand All @@ -132,9 +151,9 @@ impl ResponseCompressionCtx {
pub fn adjust_decompression(&mut self, enabled: bool) {
match &mut self.0 {
CtxInner::HeaderPhase {
compression_level: _,
decompress_enable,
accept_encoding: _,
encoding_levels: _,
} => {
*decompress_enable = enabled;
}
Expand All @@ -149,9 +168,9 @@ impl ResponseCompressionCtx {
}
match &mut self.0 {
CtxInner::HeaderPhase {
compression_level: _,
decompress_enable: _,
accept_encoding,
encoding_levels: _,
} => parse_accept_encoding(
req.headers.get(http::header::ACCEPT_ENCODING),
accept_encoding,
Expand All @@ -167,9 +186,9 @@ impl ResponseCompressionCtx {
}
match &self.0 {
CtxInner::HeaderPhase {
compression_level,
decompress_enable,
accept_encoding,
encoding_levels: levels,
} => {
if resp.status.is_informational() {
if resp.status == http::status::StatusCode::SWITCHING_PROTOCOLS {
Expand All @@ -188,7 +207,7 @@ impl ResponseCompressionCtx {
let action = decide_action(resp, accept_encoding);
let encoder = match action {
Action::Noop => None,
Action::Compress(algorithm) => algorithm.compressor(*compression_level),
Action::Compress(algorithm) => algorithm.compressor(levels[algorithm.index()]),
Action::Decompress(algorithm) => algorithm.decompressor(*decompress_enable),
};
if encoder.is_some() {
Expand All @@ -206,9 +225,9 @@ impl ResponseCompressionCtx {
pub fn response_body_filter(&mut self, data: Option<&Bytes>, end: bool) -> Option<Bytes> {
match &mut self.0 {
CtxInner::HeaderPhase {
compression_level: _,
decompress_enable: _,
accept_encoding: _,
encoding_levels: _,
} => panic!("Wrong phase: HeaderPhase"),
CtxInner::BodyPhase(compressor) => {
let result = compressor
Expand Down Expand Up @@ -258,8 +277,8 @@ impl ResponseCompressionCtx {
}
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum Algorithm {
#[derive(Debug, PartialEq, Eq, Clone, Copy, EnumCountMacro)]
pub enum Algorithm {
Any, // the "*"
Gzip,
Brotli,
Expand Down Expand Up @@ -303,6 +322,10 @@ impl Algorithm {
}
}
}

pub fn index(&self) -> usize {
*self as usize
}
}

impl From<&str> for Algorithm {
Expand Down
2 changes: 1 addition & 1 deletion pingora-core/src/protocols/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub enum HttpTask {
Trailer(Option<Box<http::HeaderMap>>),
/// Signal that the response is already finished
Done,
/// Signal that the reading of the response encounters errors.
/// Signal that the reading of the response encountered errors.
Failed(pingora_error::BError),
}

Expand Down

0 comments on commit c943fc1

Please sign in to comment.