Skip to content

Commit

Permalink
Add zstd compression (ava-labs#1278)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan Laine authored Apr 5, 2023
1 parent dc43da8 commit 529d7be
Show file tree
Hide file tree
Showing 29 changed files with 1,130 additions and 550 deletions.
45 changes: 41 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/ava-labs/avalanchego/staking"
"github.com/ava-labs/avalanchego/subnets"
"github.com/ava-labs/avalanchego/trace"
"github.com/ava-labs/avalanchego/utils/compression"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/dynamicip"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/storage"
"github.com/ava-labs/avalanchego/utils/timer"
"github.com/ava-labs/avalanchego/version"
"github.com/ava-labs/avalanchego/vms/platformvm/reward"
"github.com/ava-labs/avalanchego/vms/proposervm"
)
Expand All @@ -62,7 +64,9 @@ const (

var (
// Deprecated key --> deprecation message (i.e. which key replaces it)
deprecatedKeys = map[string]string{}
deprecatedKeys = map[string]string{
NetworkCompressionEnabledKey: fmt.Sprintf("use --%s instead", NetworkCompressionTypeKey),
}

errInvalidStakerWeights = errors.New("staking weights must be positive")
errStakingDisableOnPublicNetwork = errors.New("staking disabled on public network")
Expand All @@ -81,6 +85,7 @@ var (
errMissingStakingSigningKeyFile = errors.New("missing staking signing key file")
errTracingEndpointEmpty = fmt.Errorf("%s cannot be empty", TracingEndpointKey)
errPluginDirNotADirectory = errors.New("plugin dir is not a directory")
errZstdNotSupported = errors.New("zstd compression not supported until v1.10")
)

func getConsensusConfig(v *viper.Viper) avalanche.Parameters {
Expand Down Expand Up @@ -302,13 +307,45 @@ func getGossipConfig(v *viper.Viper) subnets.GossipConfig {
}
}

func getNetworkConfig(v *viper.Viper, stakingEnabled bool, halflife time.Duration) (network.Config, error) {
func getNetworkConfig(
v *viper.Viper,
stakingEnabled bool,
halflife time.Duration,
networkID uint32, // TODO remove after cortina upgrade
) (network.Config, error) {
// Set the max number of recent inbound connections upgraded to be
// equal to the max number of inbound connections per second.
maxInboundConnsPerSec := v.GetFloat64(InboundThrottlerMaxConnsPerSecKey)
upgradeCooldown := v.GetDuration(InboundConnUpgradeThrottlerCooldownKey)
upgradeCooldownInSeconds := upgradeCooldown.Seconds()
maxRecentConnsUpgraded := int(math.Ceil(maxInboundConnsPerSec * upgradeCooldownInSeconds))

var (
compressionType compression.Type
err error
)
if v.IsSet(NetworkCompressionTypeKey) {
if v.IsSet(NetworkCompressionEnabledKey) {
return network.Config{}, fmt.Errorf("cannot set both %q and %q", NetworkCompressionTypeKey, NetworkCompressionEnabledKey)
}

compressionType, err = compression.TypeFromString(v.GetString(NetworkCompressionTypeKey))
if err != nil {
return network.Config{}, err
}
} else {
if v.GetBool(NetworkCompressionEnabledKey) {
compressionType = constants.DefaultNetworkCompressionType
} else {
compressionType = compression.TypeNone
}
}

cortinaTime := version.GetCortinaTime(networkID)
if compressionType == compression.TypeZstd && !time.Now().After(cortinaTime) {
// TODO remove after cortina upgrade
return network.Config{}, errZstdNotSupported
}
config := network.Config{
// Throttling
ThrottlerConfig: network.ThrottlerConfig{
Expand Down Expand Up @@ -383,7 +420,7 @@ func getNetworkConfig(v *viper.Viper, stakingEnabled bool, halflife time.Duratio
},

MaxClockDifference: v.GetDuration(NetworkMaxClockDifferenceKey),
CompressionEnabled: v.GetBool(NetworkCompressionEnabledKey),
CompressionType: compressionType,
PingFrequency: v.GetDuration(NetworkPingFrequencyKey),
AllowPrivateIPs: v.GetBool(NetworkAllowPrivateIPsKey),
UptimeMetricFreq: v.GetDuration(UptimeMetricFreqKey),
Expand Down Expand Up @@ -1345,7 +1382,7 @@ func GetNodeConfig(v *viper.Viper) (node.Config, error) {
}

// Network Config
nodeConfig.NetworkConfig, err = getNetworkConfig(v, nodeConfig.EnableStaking, healthCheckAveragerHalflife)
nodeConfig.NetworkConfig, err = getNetworkConfig(v, nodeConfig.EnableStaking, healthCheckAveragerHalflife, nodeConfig.NetworkID)
if err != nil {
return node.Config{}, err
}
Expand Down
3 changes: 3 additions & 0 deletions config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/genesis"
"github.com/ava-labs/avalanchego/trace"
"github.com/ava-labs/avalanchego/utils/compression"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/ulimit"
"github.com/ava-labs/avalanchego/utils/units"
Expand Down Expand Up @@ -152,6 +153,8 @@ func addNodeFlags(fs *pflag.FlagSet) {
fs.Duration(NetworkPingFrequencyKey, constants.DefaultPingFrequency, "Frequency of pinging other peers")

fs.Bool(NetworkCompressionEnabledKey, constants.DefaultNetworkCompressionEnabled, "If true, compress certain outbound messages. This node will be able to parse compressed inbound messages regardless of this flag's value")
fs.String(NetworkCompressionTypeKey, constants.DefaultNetworkCompressionType.String(), fmt.Sprintf("Compression type for outbound messages. Must be one of [%s, %s, %s]", compression.TypeGzip, compression.TypeZstd, compression.TypeNone))

fs.Duration(NetworkMaxClockDifferenceKey, constants.DefaultNetworkMaxClockDifference, "Max allowed clock difference value between this node and peers")
fs.Bool(NetworkAllowPrivateIPsKey, constants.DefaultNetworkAllowPrivateIPs, "Allows the node to initiate outbound connection attempts to peers with private IPs")
fs.Bool(NetworkRequireValidatorToConnectKey, constants.DefaultNetworkRequireValidatorToConnect, "If true, this node will only maintain a connection with another node if this node is a validator, the other node is a validator, or the other node is a beacon")
Expand Down
3 changes: 2 additions & 1 deletion config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ const (
NetworkPingTimeoutKey = "network-ping-timeout"
NetworkPingFrequencyKey = "network-ping-frequency"
NetworkMaxReconnectDelayKey = "network-max-reconnect-delay"
NetworkCompressionEnabledKey = "network-compression-enabled"
NetworkCompressionEnabledKey = "network-compression-enabled" // TODO this is deprecated. Eventually remove it and constants.DefaultNetworkCompressionEnabled
NetworkCompressionTypeKey = "network-compression-type"
NetworkMaxClockDifferenceKey = "network-max-clock-difference"
NetworkAllowPrivateIPsKey = "network-allow-private-ips"
NetworkRequireValidatorToConnectKey = "network-require-validator-to-connect"
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module github.com/ava-labs/avalanchego
go 1.19

require (
github.com/DataDog/zstd v1.5.2
github.com/Microsoft/go-winio v0.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/avalanche-network-runner-sdk v0.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
Expand Down
9 changes: 7 additions & 2 deletions message/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/avalanchego/utils/compression"
"github.com/ava-labs/avalanchego/utils/logging"
)

var _ Creator = (*creator)(nil)
Expand All @@ -23,13 +26,15 @@ type creator struct {
}

func NewCreator(
log logging.Logger,
metrics prometheus.Registerer,
parentNamespace string,
compressionEnabled bool,
compressionType compression.Type,
maxMessageTimeout time.Duration,
) (Creator, error) {
namespace := fmt.Sprintf("%s_codec", parentNamespace)
builder, err := newMsgBuilder(
log,
namespace,
metrics,
maxMessageTimeout,
Expand All @@ -39,7 +44,7 @@ func NewCreator(
}

return &creator{
OutboundMsgBuilder: newOutboundBuilder(compressionEnabled, builder),
OutboundMsgBuilder: newOutboundBuilder(compressionType, builder),
InboundMsgBuilder: newInboundBuilder(builder),
}, nil
}
2 changes: 2 additions & 0 deletions message/inbound_msg_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer/mockable"
)

Expand All @@ -21,6 +22,7 @@ func Test_newMsgBuilder(t *testing.T) {
require := require.New(t)

mb, err := newMsgBuilder(
logging.NoLog{},
"test",
prometheus.NewRegistry(),
10*time.Second,
Expand Down
Loading

0 comments on commit 529d7be

Please sign in to comment.