Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
Several updates including:
- Addition of the subservice submodule
- Fix of the auto module and rewrite of all subservice functions by using the newly developed subservice submodule
- Update of documentation
  • Loading branch information
Cr0wTom committed Jun 6, 2023
1 parent a9349e7 commit d3edb17
Show file tree
Hide file tree
Showing 2 changed files with 270 additions and 40 deletions.
255 changes: 215 additions & 40 deletions caringcaribou/modules/uds.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
DELAY_TESTER_PRESENT = 0.5
DELAY_SECSEED_RESET = 0.01
TIMEOUT_SERVICES = 0.2
TIMEOUT_SUBSERVICES = 0.02

# Max number of arbitration IDs to backtrack during verification
VERIFICATION_BACKTRACK = 5
Expand Down Expand Up @@ -383,6 +384,107 @@ def __service_discovery_wrapper(args):
service_name = UDS_SERVICE_NAMES.get(service_id, "Unknown service")
print("Supported service 0x{0:02x}: {1}"
.format(service_id, service_name))

def sub_discovery(arb_id_request, arb_id_response, diagnostic, service, timeout, print_results=True):
"""Scans for supported UDS Diagnostic Session Control subservices on the specified arbitration ID.
Returns a list of found Diagnostic Session Control subservice IDs.
:param arb_id_request: arbitration ID for requests
:param arb_id_response: arbitration ID for responses
:param timeout: delay between each request sent
:param min_id: first service ID to scan
:param max_id: last service ID to scan
:param print_results: whether progress should be printed to stdout
:type arb_id_request: int
:type arb_id_response: int
:type timeout: float
:type min_id: int
:type max_id: int
:type print_results: bool
:return: list of supported service IDs
:rtype [int]
"""
found_subservices = []
subservice_status = []

try:
for i in range(1,256):

if service != Services.DiagnosticSessionControl:
extended_session(arb_id_request, arb_id_response, diagnostic)
else:
extended_session(arb_id_request, arb_id_response, 1)

time.sleep(0.1)

response = raw_send(arb_id_request, arb_id_response, service, i)

service_name = UDS_SERVICE_NAMES.get(service, "Unknown service")

print("\rProbing sub-service ID 0x{0:02x} for service {1} (0x{2:02x}).".format(i, service_name, service), end="")

if response is None:
# No response received
continue

# Parse response
if len(response) >= 2:
response_id = response[0]
response_service_id = response[1]
if len(response) >= 3:
status = response[2]
if Iso14229_1.is_positive_response(response):
found_subservices.append(i)
subservice_status.append(0x00)
elif response_id == Constants.NR_SI and response_service_id == service and status != NegativeResponseCodes.SUB_FUNCTION_NOT_SUPPORTED:
# Any other response than "service not supported" counts
found_subservices.append(i)
subservice_status.append(response_service_id)

time.sleep(timeout)

except KeyboardInterrupt:
if print_results:
print("\nInterrupted by user!\n")
return found_subservices, subservice_status


def __sub_discovery_wrapper(args):
"""Wrapper used to initiate a subservice discovery scan"""
arb_id_request = args.src
arb_id_response = args.dst
diagnostic = args.dsc
service = args.service
timeout = args.timeout

# Probe subservices
found_subservices, subservice_status = sub_discovery(arb_id_request,
arb_id_response, diagnostic, service, timeout)

service_name = UDS_SERVICE_NAMES.get(service, "Unknown service")
# Print results
if len(found_subservices) == 0:
print("\nNo Sub-Services were discovered for service {0:02x} - {1}.\n".format(service, service_name, end=' '))
else:
print("\nSub-Services Discovered for Service {0:02x} - {1}:\n".format(service, service_name, end=' '))
for subservice_id in found_subservices:
nrc_description = NRC_NAMES.get(subservice_status[found_subservices.index(subservice_id)])
print("\n0x{0:02x} : {1}".format(subservice_id, nrc_description), end=' ')


def raw_send(arb_id_request, arb_id_response, service, session_type):
with IsoTp(arb_id_request=arb_id_request,
arb_id_response=arb_id_response) as tp:
# Setup filter for incoming messages
request = [0] * 2
request[0] = service
request[1] = session_type

tp.set_filter_single_arbitration_id(arb_id_response)
with Iso14229_1(tp) as uds:
tp.send_request(request)
response = uds.receive_response(Iso14229_1.P3_CLIENT)
return response


def tester_present(arb_id_request, delay, duration,
Expand Down Expand Up @@ -738,6 +840,20 @@ def __auto_wrapper(args):
# No UDS discovered
print("\nDiagnostics service could not be found.")
else:

# Print result table
print("\nIdentified diagnostics:\n")
table_line = "+------------+------------+"
print(table_line)
print("| CLIENT ID | SERVER ID |")
print(table_line)
for (client_id, server_id) in arb_id_pairs:
print("| 0x{0:08x} | 0x{1:08x} |"
.format(client_id, server_id))
print(table_line)
print("\n")

# Enumerate each pair
for (client_id, server_id) in arb_id_pairs:

args.src = client_id
Expand All @@ -749,6 +865,7 @@ def __auto_wrapper(args):
table_line = "+------------+------------+"
print(table_line)
print("| CLIENT ID | SERVER ID |")
print(table_line)
print("| 0x{0:08x} | 0x{1:08x} |"
.format(client_id, server_id))
print(table_line)
Expand All @@ -774,65 +891,94 @@ def __auto_wrapper(args):
print("\nEnumerating Diagnostic Session Control Service:\n")

found_subservices = []
subservice_status = []

for i in range(1,256):

extended_session(client_id, server_id, 1)
#time.sleep(0.50)

#time.sleep(0.1)

response = extended_session(client_id, server_id, i)

print("\rProbing diagnostic session control sub-service 0x{0:02x}".format(i), end="")

if response is None:
# No response received
continue

# Parse response
if len(response) > 3:
response_id = response[1]
# response_service_id = response[2]
status = response[3]
if response_id != Constants.NR_SI:
if len(response) >= 3:
response_id = response[0]
response_service_id = response[1]
status = response[2]
if Iso14229_1.is_positive_response(response):
found_subservices.append(i)
elif status != NegativeResponseCodes.SERVICE_NOT_SUPPORTED:
subservice_status.append(0x00)
elif response_id == Constants.NR_SI and response_service_id == 0x10 and status != NegativeResponseCodes.SUB_FUNCTION_NOT_SUPPORTED:
# Any other response than "service not supported" counts
found_subservices.append(i)

print("\n")
subservice_status.append(response_service_id)

time.sleep(timeout)

# Print results
if len(found_subservices) == 0:
print("\nNo Diagnostic Session Control Sub-Services were discovered\n")
print("\nNo Diagnostic Session Control Sub-Services were discovered\n", end=' ')
else:
print("Discovered Diagnostic Session Control Sub-Services:", end=' ')
print("\n")
print("\nDiscovered Diagnostic Session Control Sub-Services:\n", end=' ')
for subservice_id in found_subservices:
print("0x{0:02x},".format(subservice_id), end=' ')
nrc_description = NRC_NAMES.get(subservice_status[found_subservices.index(subservice_id)])
print("\n0x{0:02x} : {1}".format(subservice_id, nrc_description), end=' ')

if 17 in found_services:

print("\n")
print("\nEnumerating ECUReset Service:\n")

found_ecureset = []
found_subservices = []
subservice_status = []

for i in range(1,5):

extended_session(client_id, server_id, 3)

#time.sleep(0.1)

response = raw_send(client_id, server_id, 17, i)

for i in range(1,6):
response = ecu_reset(client_id, server_id, i, timeout)
print("\rProbing ECUReset sub-service 0x{0:02x}".format(i), end="")

if response is None:
# No Reponse received
# No response received
continue
else:

# Parse response
if len(response) >= 2:
response_id = response[0]
response_service_id = response[1]
if len(response) >= 3:
status = response[2]
if Iso14229_1.is_positive_response(response):
response_service_id = response[0]
subfunction = response[1]
expected_response_id = Iso14229_1.get_service_response_id(Services.EcuReset.service_id)
if (response_service_id == expected_response_id and subfunction == i):
found_ecureset.append(i)
time.sleep(2)
found_subservices.append(i)
subservice_status.append(0x00)
elif response_id == Constants.NR_SI and response_service_id == 0x11 and status != NegativeResponseCodes.SUB_FUNCTION_NOT_SUPPORTED:
# Any other response than "service not supported" counts
found_subservices.append(i)
subservice_status.append(response_service_id)

print("\n")
if len(found_ecureset) == 0:
print("\nNo ECUReset Sub-Services were discovered\n")
time.sleep(timeout)

# Print results
if len(found_subservices) == 0:
print("\nNo ECUReset Sub-Services were discovered.\n", end=' ')
else:
print("Discovered ECUReset Sub-Services:", end=' ')
for ecureset_id in found_ecureset:
print("0x{0:02x},".format(ecureset_id), end=' ')
print("\n")
print("\nDiscovered ECUReset Sub-Services:\n", end=' ')
for subservice_id in found_subservices:
nrc_description = NRC_NAMES.get(subservice_status[found_subservices.index(subservice_id)])
print("\n0x{0:02x} : {1}".format(subservice_id, nrc_description), end=' ')



Expand All @@ -846,27 +992,35 @@ def __auto_wrapper(args):
print("\rProbing security access sub-service 0x{0:02x} in diagnostic session 0x{1:02x}.".format(level, subservice_id), end=" ")
extended_session(client_id, server_id, 1)
extended_session(client_id, server_id, subservice_id)
response = request_seed(client_id, server_id, level, None, None)
response = raw_send(client_id, server_id, 39, level)

if response is None:
continue
elif Iso14229_1.is_positive_response(response):
#print("Seed received successfully with diagnostic session 0x{0:02x} and security access type 0x{0:02x}.".format(subservice_id, level))
found_subdiag.append(subservice_id)
found_subsec.append(level)

print("\n")
print("\nSecurity Access Sub Services:\n")
table_line_sec = "+----------------------+-------------------+"
print(table_line_sec)
print("| Diagnostic Session | Security Access |")
print("| 0x{0:02x} | 0x{1:02x} |"
.format(subservice_id, level))
print(table_line_sec)
if len(found_subsec) == 0:
print("\nNo Security Access Sub-Services were discovered.\n")
else:
print("\n")
print("\nDiscovered Security Access Sub Services:\n")
print("\n")
table_line_sec = "+----------------------+-------------------+"
print(table_line_sec)
print("| Diagnostic Session | Security Access |")
print(table_line_sec)
for counter in range(len(found_subsec)):
diag = found_subdiag[counter]
sec = found_subsec[counter]
print("| 0x{0:02x} | 0x{1:02x} |"
.format(diag, sec))
counter += 1
print(table_line_sec)


except ValueError as e:
print("Discovery failed: {0}".format(e))
print("\nDiscovery failed: {0}".format(e), end=" ")



Expand Down Expand Up @@ -997,6 +1151,27 @@ def __parse_args(args):
.format(TIMEOUT_SERVICES))
parser_info.set_defaults(func=__service_discovery_wrapper)

# Parser for diagnostics session control subservice discovery
parser_sub = subparsers.add_parser("subservices")
parser_sub.add_argument("dsc", metavar="dtype",
type=parse_int_dec_or_hex, default="0x01",
help="Diagnostic Session Control Subsession Byte")
parser_sub.add_argument("service", metavar="stype",
type=parse_int_dec_or_hex,
help="Service ID")
parser_sub.add_argument("src",
type=parse_int_dec_or_hex,
help="arbitration ID to transmit to")
parser_sub.add_argument("dst",
type=parse_int_dec_or_hex,
help="arbitration ID to listen to")
parser_sub.add_argument("-t", "--timeout", metavar="T",
type=float, default=TIMEOUT_SUBSERVICES,
help="wait T seconds for response before "
"timeout (default: {0})"
.format(TIMEOUT_SUBSERVICES))
parser_sub.set_defaults(func=__sub_discovery_wrapper)

# Parser for ECU Reset
parser_ecu_reset = subparsers.add_parser("ecu_reset")
parser_ecu_reset.add_argument("reset_type", metavar="type",
Expand Down
Loading

0 comments on commit d3edb17

Please sign in to comment.