Skip to content

Commit

Permalink
Fix bag rewrite (nasa#493)
Browse files Browse the repository at this point in the history
* rosbag_rewrite_types.py: re-enable writing output bag, see nasa#490

* bag migration: make CI testing more thorough, see nasa#490
  • Loading branch information
trey0 authored May 17, 2022
1 parent 61b1e8f commit 73e4a48
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 20 deletions.
4 changes: 4 additions & 0 deletions tools/bag_processing/scripts/Makefile.rosbag_fix_all
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ ROS_VERSION=$(shell rosversion -d)
ifeq (,${REWRITE_TYPES})
REWRITE_TYPES = $(shell catkin_find --first-only bag_processing scripts/rosbag_rewrite_types.py)
endif
ifeq (,${ROSBAG_VERIFY})
ROSBAG_VERIFY = $(shell catkin_find --first-only bag_processing scripts/rosbag_verify.py)
endif
ifeq (,${FF_MSGS_BMR})
FF_MSGS_BMR = $(shell catkin_find --first-only ff_msgs bmr)
endif
Expand All @@ -53,3 +56,4 @@ endif
rosbag fix $< $@ ${BMR}
rm $< # explicitly delete right away to save disk space
rosbag check $@ # test: post-migration consistency
${ROSBAG_VERIFY} ${ROSBAG_VERIFY_ARGS} $@ $(@:.fix_all.bag=.bag)
26 changes: 20 additions & 6 deletions tools/bag_processing/scripts/rosbag_fix_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def dosys(cmd):
return ret


def rosbag_fix_all(inbag_paths_in, jobs):
def rosbag_fix_all(inbag_paths_in, jobs, deserialize=False):
this_folder = os.path.dirname(os.path.realpath(__file__))
makefile = os.path.join(this_folder, "Makefile.rosbag_fix_all")
inbag_paths = [p for p in inbag_paths_in if not p.endswith(".fix_all.bag")]
Expand All @@ -48,7 +48,16 @@ def rosbag_fix_all(inbag_paths_in, jobs):
)
outbag_paths = [os.path.splitext(p)[0] + ".fix_all.bag" for p in inbag_paths]
outbag_paths_str = " ".join(outbag_paths)
ret = dosys("make -f%s -j%s %s" % (makefile, jobs, outbag_paths_str))

rosbag_verify_args = "-v"
if deserialize:
rosbag_verify_args += " -d"

# "1>&2": redirect stdout to stderr to see make's command echo in rostest output
ret = dosys(
'ROSBAG_VERIFY_ARGS="%s" make -f%s -j%s %s 1>&2'
% (rosbag_verify_args, makefile, jobs, outbag_paths_str)
)

logging.info("")
logging.info("====================")
Expand All @@ -62,9 +71,7 @@ def rosbag_fix_all(inbag_paths_in, jobs):
return ret


class CustomFormatter(
argparse.ArgumentDefaultsHelpFormatter,
):
class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter):
pass


Expand All @@ -79,12 +86,19 @@ class CustomFormatter(
type=int,
default=1,
)
parser.add_argument(
"-d",
"--deserialize",
help="perform deserialization check on output bag (can be slow for large bags)",
default=False,
action="store_true",
)
parser.add_argument("inbag", nargs="+", help="input bag")

args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(message)s")

ret = rosbag_fix_all(args.inbag, args.jobs)
ret = rosbag_fix_all(args.inbag, args.jobs, deserialize=args.deserialize)

# suppress confusing ROS message at exit
logging.getLogger().setLevel(logging.WARN)
Expand Down
50 changes: 40 additions & 10 deletions tools/bag_processing/scripts/rosbag_rewrite_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,8 @@ def fix_message_definitions(inbag, fix_topic_patterns, verbose=False):
logging.info(" [none]")


def rename_types(inbag, outbag, rename_lookup, verbose=False):
"""
Modifies inbag metadata (in memory, not on disk). Changes will take
effect when inbag's messages are subsequently written to outbag.
"""

def rename_types(inbag, outbag_path, rename_lookup, verbose=False):
# rewrite connection headers
renamed_types = set()
for conn in inbag._get_connections():
hdr = conn.header
Expand All @@ -170,6 +166,7 @@ def rename_types(inbag, outbag, rename_lookup, verbose=False):
for k in ("type", "md5sum", "message_definition"):
hdr[k] = tgt[k].encode("utf-8")

# summarize rewrite rules that were applied
if verbose:
# summary of renamed messages
migrated = sorted(renamed_types)
Expand All @@ -187,6 +184,40 @@ def rename_types(inbag, outbag, rename_lookup, verbose=False):
tgt["md5sum"][:8],
)

# while copying messages we need to rename types so they are mapped to the right connection
do_meter = False # unhelpful in CI test logs
with rosbag.Bag(outbag_path, "w", options=inbag.options) as outbag:
if do_meter:
meter = rosbag.rosbag_main.ProgressMeter(
outbag_path, inbag._uncompressed_size
)
total_bytes = 0

for topic, raw_msg, t in inbag.read_messages(raw=True):
msg_type, serialized_bytes, md5sum, pos, pytype = raw_msg

rename_key = (msg_type, md5sum)
tgt = rename_lookup.get(rename_key)
if tgt is not None:
out_msg = (
tgt["type"],
serialized_bytes,
tgt["md5sum"],
pos,
tgt["pytype"],
)
else:
out_msg = raw_msg

outbag.write(topic, out_msg, t, raw=True)

if do_meter:
total_bytes += len(serialized_bytes)
meter.step(total_bytes)

if do_meter:
meter.finish()


def rewrite_msg_types1(
inbag_path_in, outbag_path, rules, verbose=False, no_reindex=False
Expand All @@ -195,14 +226,13 @@ def rewrite_msg_types1(

fix_topic_patterns, rename_lookup = rules
with rosbag.Bag(inbag_path, "r") as inbag:
with rosbag.Bag(outbag_path, "w", options=inbag.options) as outbag:
fix_message_definitions(inbag, fix_topic_patterns, verbose)
rename_types(inbag, outbag, rename_lookup, verbose)
fix_message_definitions(inbag, fix_topic_patterns, verbose)
rename_types(inbag, outbag_path, rename_lookup, verbose)

if tmp_folder:
dosys("rm -r %s" % tmp_folder)

cmd = "rosbag reindex %s" % outbag_path
cmd = "rosbag reindex -q %s" % outbag_path
if no_reindex:
logging.info("%s probably needs to be re-indexed. Run: %s\n", outbag_path, cmd)
else:
Expand Down
143 changes: 143 additions & 0 deletions tools/bag_processing/scripts/rosbag_verify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#!/usr/bin/env python
# Copyright (c) 2017, United States Government, as represented by the
# Administrator of the National Aeronautics and Space Administration.
#
# All rights reserved.
#
# The Astrobee platform is licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""
Verify correctness of a ROS bag, as output by rosbag_fix_all.py. This
performs some simple checks that are not part of the standard "rosbag
check".
"""

from __future__ import print_function

import argparse
import json
import logging
import os
import sys
import traceback as tb

import rosbag


def get_message_counts(bag_path):
with rosbag.Bag(bag_path, "r") as bag:
types, topics = bag.get_type_and_topic_info()
return {topic: info.message_count for topic, info in topics.items()}


def message_counts_match_internal(bag_path1, bag_path2):
c1 = get_message_counts(bag_path1)
c2 = get_message_counts(bag_path2)
if c1 == c2:
num_topics = len(c1)
num_messages = sum([mc for mc in c1.values()])
logging.info("PASS: Input bags have identical message counts on each topic")
logging.debug(
" Each file has a total of %d messages on %d topics",
num_messages,
num_topics,
)
return True
else:
logging.warning("FAIL: Input bags do not have identical message counts")
logging.debug(
"Bag message count info:\n%s", json.dumps({bag_path1: c1, bag_path2: c2})
)
return False


def get_exception_text():
return tb.format_exception_only(*sys.exc_info()[:2])[0].strip()


def message_counts_match(bag_path1, bag_path2):
try:
return message_counts_match_internal(bag_path1, bag_path2)
except:
logging.warning("FAIL: Caught exception during message count check")
logging.debug(" Exception was: %s", get_exception_text())
return False


def valid_deserialization(bag_path):
"""
Force deserialization of messages, which can catch some binary
corruption that is not detected by rosbag check.
"""
try:
with rosbag.Bag(bag_path, "r") as bag:
for topic, msg, t in bag.read_messages():
pass
except:
logging.warning("FAIL: Caught exception during message deserialization check")
logging.debug(" Exception was: %s", get_exception_text())
return False

logging.info("PASS: Successfully deserialized messages")
return True


def verify(fixed_path, orig_path, verbose=False, deserialize=False):
result = True
result &= message_counts_match(orig_path, fixed_path)
if deserialize:
result &= valid_deserialization(fixed_path)

if result:
logging.info("PASS: rosbag_verify: All tests passed")
return 0
else:
logging.warning("FAIL: rosbag_verify: Some tests failed; check output above")
return 1


if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"-v",
"--verbose",
help="print more debug info",
default=False,
action="store_true",
)
parser.add_argument(
"-d",
"--deserialize",
help="perform deserialization check (can be very slow on large bags)",
default=False,
action="store_true",
)
parser.add_argument("fixed_bag", help="Path to fixed bag to verify")
parser.add_argument("orig_bag", help="Path to original bag to check against")

args = parser.parse_args()

level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=level, format="%(message)s")

ret = verify(
args.fixed_bag,
args.orig_bag,
verbose=args.verbose,
deserialize=args.deserialize,
)

# suppress confusing ROS message at exit
logging.getLogger().setLevel(logging.WARN)

sys.exit(ret)
14 changes: 10 additions & 4 deletions tools/bag_processing/test/test_fix_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ def test_fix_all(self):
bags = os.path.join(bags_folder, "*.bag")

bag_list = glob.glob(bags)
self.assertIsNot(len(bag_list), 0) # sanity check bags path
self.assertIsNot(len(bag_list), 0) # check test bags exist

fix_all_return_value = dosys("%s %s" % (fix_all, bags))
self.assertIs(fix_all_return_value, 0) # check bags fixed
dosys("rm %s/*.fix_all.bag" % bags_folder)
fix_all_return_value = dosys("%s -d %s" % (fix_all, bags))
self.assertIs(fix_all_return_value, 0) # check script success

fixed_bags = os.path.join(bags_folder, "*.fix_all.bag")
fixed_bag_list = glob.glob(fixed_bags)
self.assertEquals(len(bag_list), len(fixed_bag_list)) # check # output files

# cleanup
dosys("rm %s" % fixed_bags)


if __name__ == "__main__":
Expand Down

0 comments on commit 73e4a48

Please sign in to comment.