Skip to content

Commit

Permalink
[Typing] Initial typing of arista_tp.py (aerleon#258)
Browse files Browse the repository at this point in the history
  • Loading branch information
ankenyr authored Mar 10, 2023
1 parent e88d2b8 commit acf2d0b
Showing 1 changed file with 59 additions and 46 deletions.
105 changes: 59 additions & 46 deletions aerleon/lib/arista_tp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,44 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""arista traffic-policy generator."""
"""Arista traffic-policy generator."""

import copy
import re
from typing import Dict, List, Set, Tuple, Union

from absl import logging

from aerleon.lib import aclgenerator

# 1 2 3
# 123456789012345678901234567890123456789
# traffic-policies
# traffic-policy foo
# match dos-attaqrs-source-ip ipv4 << TERM_INDENT
# 1 2 3 4 5 6
# 123456789012345678901234567890123456789012345678901234567890123456789
# !! i am a comment, hear me rawr << MATCH_INDENT
# !!
# source prefix field-set << MATCH_INDENT
# !
# actions
# counter edge-dos-attaqrs-source-ip-count << ACTION_INDENT
# drop
# !
#
# 1 2 3
# 123456789012345678901234567890123456789
# traffic-policies
# field-set ipv4 prefix dst-hjjqurby6yftqk6fa3xx4fas << TERM_INDENT
# 0.0.0.0/0 << MATCH_INDENT
# except 34.64.0.0/26
# !
# field-set ipv4 prefix dst-hjjqurby6yftqk6fa3xx4fas

from aerleon.lib.nacaddr import IPv4, IPv6
from aerleon.lib.policy import Policy

"""
1 2 3
123456789012345678901234567890123456789
traffic-policies
traffic-policy foo
match dos-attaqrs-source-ip ipv4 << TERM_INDENT
1 2 3 4 5 6
123456789012345678901234567890123456789012345678901234567890123456789
!! i am a comment, hear me rawr << MATCH_INDENT
!!
source prefix field-set << MATCH_INDENT
!
actions
counter edge-dos-attaqrs-source-ip-count << ACTION_INDENT
drop
!
1 2 3
123456789012345678901234567890123456789
traffic-policies
field-set ipv4 prefix dst-hjjqurby6yftqk6fa3xx4fas << TERM_INDENT
0.0.0.0/0 << MATCH_INDENT
except 34.64.0.0/26
!
field-set ipv4 prefix dst-hjjqurby6yftqk6fa3xx4fas
"""
# various indentation constants - see above
INDENT_STR = " " * 3 # 3 spaces
TERM_INDENT = 2 * INDENT_STR
Expand All @@ -70,9 +74,9 @@ class AristaTpFragmentInV6Error(Error):


class Config:
"""config allows a configuration to be assembled easily.
"""Config allows a configuration to be assembled easily.
when appending to the configuration object, the element should be indented
When appending to the configuration object, the element should be indented
according to the arista traffic-policy style.
a text representation of the config can be extracted with str().
Expand All @@ -83,13 +87,13 @@ class Config:
"""

def __init__(self):
def __init__(self) -> None:
self.lines = []

def __str__(self):
def __str__(self) -> str:
return "\n".join(self.lines)

def Append(self, line_indent, line, verbatim=False):
def Append(self, line_indent: str, line: str, verbatim: bool = False) -> None:
"""append one line to the configuration.
Args:
Expand Down Expand Up @@ -147,7 +151,7 @@ class Term(aclgenerator.Term):
},
}

def __init__(self, term, term_type, noverbose):
def __init__(self, term, term_type, noverbose) -> None:
super().__init__(term)
self.term = term
self.term_type = term_type # drives the address-family
Expand All @@ -156,7 +160,7 @@ def __init__(self, term, term_type, noverbose):
if term_type not in self._TERM_TYPE:
raise ValueError("unknown filter type: %s" % term_type)

def __str__(self):
def __str__(self) -> str:
config = Config()

# a LoL which will be appended to the config at the end of this method
Expand Down Expand Up @@ -389,7 +393,7 @@ def __str__(self):

return str(config)

def _processPorts(self, term):
def _processPorts(self, term) -> str:
port_str = ""

# source port generation
Expand All @@ -402,7 +406,7 @@ def _processPorts(self, term):

return port_str

def _processICMP(self, term):
def _processICMP(self, term) -> Tuple[str, str]:
icmp_types = [""]
icmp_code_str = ""
icmp_type_str = " type "
Expand All @@ -425,7 +429,7 @@ def _processICMP(self, term):

return icmp_type_str, icmp_code_str

def _processProtocol(self, term_type, term, flags):
def _processProtocol(self, term_type, term, flags) -> str:
anet_proto_map = {
"inet": {
# <1-255> protocol values(s) or range(s) of protocol values
Expand Down Expand Up @@ -481,7 +485,7 @@ def _processProtocol(self, term_type, term, flags):

return protocol_str

def _processProtocolExcept(self, term_type, term, flags):
def _processProtocolExcept(self, term_type, term, flags) -> str:
# EOS does not have a protocol-except keyword. it does, however, support
# lists of protocol-ids. given a term this function will generate the
# appropriate list of protocol-id's which *will* be permited. within the
Expand Down Expand Up @@ -517,7 +521,7 @@ def _processProtocolExcept(self, term_type, term, flags):

return protocol_str

def _processTermOptions(self, term, options):
def _processTermOptions(self, term, options) -> Tuple[List[str], List[str]]:
flags = []
misc_options = []

Expand Down Expand Up @@ -550,7 +554,7 @@ def _processTermOptions(self, term, options):

return flags, misc_options

def _Group(self, group, lc=True):
def _Group(self, group, lc=True) -> str:
"""If 1 item return it, else return [item1 item2].
Args:
Expand All @@ -563,7 +567,7 @@ def _Group(self, group, lc=True):
just ';' appended if len(group) == 1
"""

def _FormattedGroup(el, lc=True):
def _FormattedGroup(el, lc=True) -> str:
"""Return the actual formatting of an individual element.
Args:
Expand Down Expand Up @@ -613,7 +617,7 @@ class AristaTrafficPolicy(aclgenerator.ACLGenerator):

SUFFIX = ".atp"

def _BuildTokens(self):
def _BuildTokens(self) -> Tuple[Set[str], Dict[str, Set[str]]]:
"""returns: tuple of supported tokens and sub tokens."""
supported_tokens, supported_sub_tokens = super()._BuildTokens()

Expand Down Expand Up @@ -661,7 +665,9 @@ def _BuildTokens(self):
)
return supported_tokens, supported_sub_tokens

def _MinimizePrefixes(self, include, exclude):
def _MinimizePrefixes(
self, include: List[Union[IPv4, IPv6]], exclude: List[Union[IPv4, IPv6]]
) -> Union[Tuple[List[IPv4], List[IPv4]], Tuple[List[IPv6], List[IPv6]]]:
"""Calculate a minimal set of prefixes for match conditions.
Args:
Expand Down Expand Up @@ -694,7 +700,14 @@ def _MinimizePrefixes(self, include, exclude):

return include_result, exclude_result

def _GenPrefixFieldset(self, direction, name, pfxs, ex_pfxs, af):
def _GenPrefixFieldset(
self,
direction: str,
name: str,
pfxs: List[Union[IPv4, IPv6]],
ex_pfxs: List[Union[IPv4, IPv6]],
af: str,
) -> str:
field_list = ""

for p in pfxs:
Expand All @@ -706,7 +719,7 @@ def _GenPrefixFieldset(self, direction, name, pfxs, ex_pfxs, af):
field_set = fieldset_hdr + field_list
return field_set

def _TranslatePolicy(self, pol, exp_info):
def _TranslatePolicy(self, pol: Policy, exp_info: int) -> None:
self.arista_traffic_policies = []
af_map_txt = {"inet": "ipv4", "inet6": "ipv6"}

Expand Down Expand Up @@ -904,7 +917,7 @@ def _TranslatePolicy(self, pol, exp_info):
)
)

def __str__(self):
def __str__(self) -> str:
config = Config()

for (
Expand Down

0 comments on commit acf2d0b

Please sign in to comment.