Skip to content

Commit

Permalink
traffic sign detection and processing working for all 5 traffic signs…
Browse files Browse the repository at this point in the history
… and pedestrians.
  • Loading branch information
dctian committed Apr 30, 2019
1 parent 8df74d5 commit 6a61ac5
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 73 deletions.
122 changes: 69 additions & 53 deletions driver/code/deep_pi_car.py
Original file line number Diff line number Diff line change
@@ -1,120 +1,136 @@
import logging
import sys
import picar
import cv2
import datetime
from hand_coded_lane_follower import HandCodedLaneFollower
from objects_on_road_processor import ObjectsOnRoadProcessor

_SHOW_IMAGE = True


class DeepPiCar(object):

_DEBUG = False
_DEBUG_INFO = 'DEBUG "back_wheels.py":'

__INITIAL_SPEED = 0
__SCREEN_WIDTH = 320
__SCREEN_HEIGHT = 240

def __init__(self, debug=False):
''' Init camera and wheels'''
def __init__(self):
""" Init camera and wheels"""
logging.info('Creating a DeepPiCar...')

picar.setup()

logging.debug('Set up camera')
self.camera = cv2.VideoCapture(-1)
self.camera.set(3, self.__SCREEN_WIDTH)
self.camera.set(4, self.__SCREEN_HEIGHT)

self.pan_servo = picar.Servo.Servo(1)
self.pan_servo.offset = -30 # calibrate servo to center
self.pan_servo.write(90)

self.tilt_servo = picar.Servo.Servo(2)
self.tilt_servo.offset = 20 # calibrate servo to center
self.tilt_servo.write(90)

logging.debug('Set up back wheels')
self.back_wheels = picar.back_wheels.Back_Wheels()
self.back_wheels.speed = 0 # Speed Range is 0 (stop) - 100 (fastest)
self.back_wheels.speed = 0 # Speed Range is 0 (stop) - 100 (fastest)

logging.debug('Set up front wheels')
self.front_wheels = picar.front_wheels.Front_Wheels()
self.front_wheels.turning_offset = -20 # calibrate servo to center
self.front_wheels.turn(90) # Steering Range is 45 (left) - 90 (center) - 135 (right)
self.front_wheels.turning_offset = -20 # calibrate servo to center
self.front_wheels.turn(90) # Steering Range is 45 (left) - 90 (center) - 135 (right)

self.lane_follower = HandCodedLaneFollower(self)
#lane_follower = DeepLearningLaneFollower()

fourcc = cv2.VideoWriter_fourcc(*'XVID')
self.traffic_sign_processor = ObjectsOnRoadProcessor(self)
# lane_follower = DeepLearningLaneFollower()

self.fourcc = cv2.VideoWriter_fourcc(*'XVID')
datestr = datetime.datetime.now().strftime("%y%m%d_%H%M%S")
self.video_orig = cv2.VideoWriter('../data/car_video_%s_orig.avi' % datestr,fourcc, 20.0, (320,240))
self.video_overlay = cv2.VideoWriter('../data/car_video_%s_overlay.avi' % datestr,fourcc, 20.0, (320,240))
self.video_orig = self.create_video_recorder('../data/tmp/car_video%s.avi' % datestr)
self.video_lane = self.create_video_recorder('../data/tmp/car_video_lane%s.avi' % datestr)
self.video_objs = self.create_video_recorder('../data/tmp/car_video_objs%s.avi' % datestr)

logging.info('Created a DeepPiCar')

def __enter__ (self):
''' Entering a with statement '''

def create_video_recorder(self, path):
return cv2.VideoWriter(path, self.fourcc, 20.0, (self.__SCREEN_WIDTH, self.__SCREEN_HEIGHT))

def __enter__(self):
""" Entering a with statement """
return self
def __exit__ (self, type, value, traceback):
''' Exit a with statement'''

def __exit__(self, _type, value, traceback):
""" Exit a with statement"""
if traceback is not None:
# Exception occurred:
logging.error('Exiting with statement with exception %s' % (traceback))
logging.error('Exiting with statement with exception %s' % traceback)

self.cleanup()

def cleanup(self):
''' Reset the hardware'''
""" Reset the hardware"""
logging.info('Stopping the car, resetting hardware.')
self.back_wheels.speed = 0
self.front_wheels.turn(90)
self.camera.release()
self.video_orig.release()
self.video_overlay.release()
self.video_lane.release()
self.video_objs.release()
cv2.destroyAllWindows()
def drive(self, speed = __INITIAL_SPEED):
''' Main entry point of the car, and put it in drive mode

def drive(self, speed=__INITIAL_SPEED):
""" Main entry point of the car, and put it in drive mode
Keyword arguments:
speed -- speed of back wheel, range is 0 (stop) - 100 (fastest)
'''
"""

logging.info('Starting to drive at speed %s...' % speed)
self.back_wheels.speed = speed
i = 0
while( self.camera.isOpened()):
_, image = self.camera.read()
while self.camera.isOpened():
_, image_lane = self.camera.read()
image_objs = image_lane.copy()
i += 1

self.video_orig.write(image)

self.process_objects_on_road(image)
image = self.follow_lane(image)

self.video_overlay.write(image)
cv2.imshow('Dash Cam', image)
#plt.imshow(image, shape=(self.__SCREEN_WIDTH * 2, self.__SCREEN_WIDTH*2))
#plt.show()

if cv2.waitKey(1) & 0xFF == ord('q') :
self.video_orig.write(image_lane)

image_objs = self.process_objects_on_road(image_objs)
self.video_objs.write(image_objs)
show_image('Detected Objects', image_objs)

image_lane = self.follow_lane(image_lane)
self.video_lane.write(image_lane)
show_image('Lane Lines', image_lane)

if cv2.waitKey(1) & 0xFF == ord('q'):
self.cleanup()
break

def process_objects_on_road(self, image):
logging.debug('process_objects_road...')
image = self.traffic_sign_processor.process_objects_on_road(image)
return image

def follow_lane(self, image):
logging.debug('follow_lane...')
image = self.lane_follower.follow_lane(image)
return image


############################
# Utility Functions
############################
def show_image(title, frame, show=_SHOW_IMAGE):
if show:
cv2.imshow(title, frame)


def main():
with DeepPiCar() as car:
car.drive(40)


if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG, format='%(levelname)-5s:%(asctime)s: %(message)s')

main()
13 changes: 10 additions & 3 deletions driver/code/objects_on_road_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self,
# initialize car
self.car = car
self.speed_limit = speed_limit
self.speed = 40
self.speed = speed_limit

# initialize TensorFlow models
with open(label, 'r') as f:
Expand All @@ -41,7 +41,7 @@ def __init__(self,
# initial edge TPU engine
logging.info('Initialize Edge TPU with model %s...' % model)
self.engine = edgetpu.detection.engine.DetectionEngine(model)
self.min_confidence = 0.40
self.min_confidence = 0.30
self.num_of_objects = 3
logging.info('Initialize Edge TPU with model done.')

Expand Down Expand Up @@ -108,11 +108,18 @@ def resume_driving(self, car_state):
self.set_speed(self.speed_limit)
logging.debug('Current Speed = %d, New Speed = %d' % (old_speed, self.speed))

if old_speed > 0 and self.speed == 0:
logging.debug('come to a full stop for 1 seconds')
time.sleep(1)

def set_speed(self, speed):
# Use this setter, so we can test this class without a car attached
self.speed = speed
if self.car is not None:
self.car.speed = speed
logging.debug("Actually setting car speed to %d" % speed)
self.car.back_wheels.speed = speed



############################
# Frame processing steps
Expand Down
48 changes: 31 additions & 17 deletions driver/code/traffic_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ class TrafficObjects(object):
def set_car_state(self, car_state):
pass

def is_close_by(self, obj, frame_height):
@staticmethod
def is_close_by(obj, frame_height, min_height_pct=0.05):
# default: if a sign is 10% of the height of frame
obj_height = obj.bounding_box[1][1]-obj.bounding_box[0][1]
return obj_height / frame_height > 0.10
return obj_height / frame_height > min_height_pct


class RedTrafficLight(TrafficObjects):

Expand All @@ -29,6 +31,7 @@ class Person(TrafficObjects):

def set_car_state(self, car_state):
logging.debug('pedestrian: stopping car')

car_state['speed'] = 0


Expand All @@ -38,6 +41,7 @@ def __init__(self, speed_limit):
self.speed_limit = speed_limit

def set_car_state(self, car_state):
logging.debug('speed limit: set limit to %d' % self.speed_limit)
car_state['speed_limit'] = self.speed_limit


Expand All @@ -46,34 +50,44 @@ class StopSign(TrafficObjects):
Stop Sign object would wait
"""

def __init__(self, wait_time_in_sec=2):
self.inStopSignWaitMode = False
self.hasStopped = False
self.waitTimeInSec = wait_time_in_sec
self.timer = Timer(wait_time_in_sec, self.wait_done)
def __init__(self, wait_time_in_sec=3, min_no_stop_sign=20):
self.in_wait_mode = False
self.has_stopped = False
self.wait_time_in_sec = wait_time_in_sec
self.min_no_stop_sign = min_no_stop_sign
self.no_stop_count = min_no_stop_sign
self.timer = None

def set_car_state(self, car_state):
if self.inStopSignWaitMode:
self.no_stop_count = self.min_no_stop_sign

if self.in_wait_mode:
logging.debug('stop sign: 2) still waiting')
# wait for 2 second before proceeding
car_state['speed'] = 0
return

if not self.hasStopped:
if not self.has_stopped:
logging.debug('stop sign: 1) just detected')

car_state['speed'] = 0
self.inStopSignWaitMode = True
self.hasStopped = True
self.in_wait_mode = True
self.has_stopped = True
self.timer = Timer(self.wait_time_in_sec, self.wait_done)
self.timer.start()
return

def wait_done(self):
logging.debug('stop sign: 3) finished waiting for %d seconds' % self.waitTimeInSec)
self.inStopSignWaitMode = False
logging.debug('stop sign: 3) finished waiting for %d seconds' % self.wait_time_in_sec)
self.in_wait_mode = False

def clear(self):
if self.hasStopped:
logging.debug("stop sign: 4) no more stop sign detected")
self.hasStopped = False
self.inStopSignWaitMode = False # may not need to set this
if self.has_stopped:
# need this counter in case object detection has a glitch that one frame does not
# detect stop sign, make sure we see 20 consecutive no stop sign frames (about 1 sec)
# and then mark has_stopped = False
self.no_stop_count -= 1
if self.no_stop_count == 0:
logging.debug("stop sign: 4) no more stop sign detected")
self.has_stopped = False
self.in_wait_mode = False # may not need to set this

0 comments on commit 6a61ac5

Please sign in to comment.