forked from sileht/bird-lg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbird.py
197 lines (169 loc) · 5.89 KB
/
bird.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# -*- coding: utf-8 -*-
# vim: ts=4
###
#
# Copyright (c) 2006 Mehdi Abaakouk
# -- Original Software
# Copyright (c) 2024 Andreas Duering
# -- Fork
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3 as
# published by the Free Software Foundation
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
#
###
"""BIRD socket abstraction."""
import socket
import sys
BUFSIZE = 4096
SUCCESS_CODES = {
"0000": "OK",
"0001": "Welcome",
"0002": "Reading configuration",
"0003": "Reconfigured",
"0004": "Reconfiguration in progress",
"0005": "Reconfiguration already in progress, queueing",
"0006": "Reconfiguration ignored, shutting down",
"0007": "Shutdown ordered",
"0008": "Already disabled",
"0009": "Disabled",
"0010": "Already enabled",
"0011": "Enabled",
"0012": "Restarted",
"0013": "Status report",
"0014": "Route count",
"0015": "Reloading",
"0016": "Access restricted",
}
TABLES_ENTRY_CODES = {
"1000": "BIRD version",
"1001": "Interface list",
"1002": "Protocol list",
"1003": "Interface address",
"1004": "Interface flags",
"1005": "Interface summary",
"1006": "Protocol details",
"1007": "Route list",
"1008": "Route details",
"1009": "Static route list",
"1010": "Symbol list",
"1011": "Uptime",
"1012": "Route extended attribute list",
"1013": "Show ospf neighbors",
"1014": "Show ospf",
"1015": "Show ospf interface",
"1016": "Show ospf state/topology",
"1017": "Show ospf lsadb",
"1018": "Show memory",
}
ERROR_CODES = {
"8000": "Reply too long",
"8001": "Route not found",
"8002": "Configuration file error",
"8003": "No protocols match",
"8004": "Stopped due to reconfiguration",
"8005": "Protocol is down => cannot dump",
"8006": "Reload failed",
"8007": "Access denied",
"9000": "Command too long",
"9001": "Parse error",
"9002": "Invalid symbol type",
}
END_CODES = list(ERROR_CODES.keys()) + list(SUCCESS_CODES.keys())
class BirdSocket:
"""Wraps a BIRD socket."""
def __init__(self, host="", port="", file=""):
"""
Initializes a BirdSocket instance.
Args:
host (str, optional): The host address, if using TCP.
port (str, optional): The port number, if using TCP.
file (str, optional): The file name of the Unix socket, if using that.
File is preferred if file and (host,port) are given.
"""
self.__file = file
self.__host = host
self.__port = port
self.__sock = None
def __connect(self):
"""Opens the connection to the bird communication socket."""
if self.__sock:
return
if not self.__file:
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__sock.settimeout(3.0)
self.__sock.connect((self.__host, self.__port))
else:
self.__sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.__sock.settimeout(3.0)
self.__sock.connect(self.__file)
# read welcome message
self.__sock.recv(1024)
self.cmd("restrict")
def close(self):
"""Closes the socket (after a command)."""
if self.__sock:
try:
self.__sock.close()
except Exception:
pass
self.__sock = None
def cmd(self, cmd: str):
"""Send a command to the socket."""
if len(cmd.split("\n")) > 1:
return False, "Multiple commands are not allowed."
if not cmd.startswith("show"):
return False, "Only show commands are allowed."
try:
self.__connect()
self.__sock.send((cmd + "\n").encode("ascii"))
data = self.__read()
return data
except socket.error:
why = sys.exc_info()[1]
self.close()
return False, f"Bird connection problem: {why}"
def __read(self) -> tuple[bool, str]:
"""Reads the reply from a previously sent command.
Returns:
tuple[bool, str]: Whether the command was successful,
and (the result) or (the error message)
"""
code = "7000" # Not used in bird
parsed_string = ""
lastline = ""
while code not in END_CODES:
data = self.__sock.recv(BUFSIZE).decode("ascii")
lines = (lastline + data).split("\n")
if len(data) == BUFSIZE:
lastline = lines[-1]
lines = lines[:-1]
for line in lines:
code = line[0:4]
if not line.strip():
continue
if code == "0000":
return True, parsed_string
if code in SUCCESS_CODES:
return True, SUCCESS_CODES.get(code, "Unknown success")
if code in ERROR_CODES:
return False, ERROR_CODES.get(code, "Unknown error")
if code[0] in ["1", "2"]:
parsed_string += line[5:] + "\n"
elif code[0] == " ":
parsed_string += line[1:] + "\n"
elif code[0] == "+":
parsed_string += line[1:]
else:
parsed_string += f"<<<unparsable_string({line})>>>\n"
return True, parsed_string
__all__ = ["BirdSocket"]