forked from google/transitfeed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshuttle_from_xmlfeed.py
executable file
·137 lines (116 loc) · 5.38 KB
/
shuttle_from_xmlfeed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/python2.5
# Copyright (C) 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google has a homegrown database for managing the company shuttle. The
database dumps its contents in XML. This scripts converts the proprietary XML
format into a Google Transit Feed Specification file.
"""
import datetime
from optparse import OptionParser
import os.path
import re
import transitfeed
import urllib
try:
import xml.etree.ElementTree as ET # python 2.5
except ImportError, e:
import elementtree.ElementTree as ET # older pythons
class NoUnusedStopExceptionProblemReporter(transitfeed.ProblemReporter):
"""The company shuttle database has a few unused stops for reasons unrelated
to this script. Ignore them.
"""
def __init__(self):
accumulator = transitfeed.ExceptionProblemAccumulator()
transitfeed.ProblemReporter.__init__(self, accumulator)
def UnusedStop(self, stop_id, stop_name):
pass
def SaveFeed(input, output):
tree = ET.parse(urllib.urlopen(input))
schedule = transitfeed.Schedule()
service_period = schedule.GetDefaultServicePeriod()
service_period.SetWeekdayService()
service_period.SetStartDate('20070314')
service_period.SetEndDate('20071231')
# Holidays for 2007
service_period.SetDateHasService('20070528', has_service=False)
service_period.SetDateHasService('20070704', has_service=False)
service_period.SetDateHasService('20070903', has_service=False)
service_period.SetDateHasService('20071122', has_service=False)
service_period.SetDateHasService('20071123', has_service=False)
service_period.SetDateHasService('20071224', has_service=False)
service_period.SetDateHasService('20071225', has_service=False)
service_period.SetDateHasService('20071226', has_service=False)
service_period.SetDateHasService('20071231', has_service=False)
stops = {} # Map from xml stop id to python Stop object
agency = schedule.NewDefaultAgency(name='GBus', url='http://shuttle/',
timezone='America/Los_Angeles')
for xml_stop in tree.getiterator('stop'):
stop = schedule.AddStop(lat=float(xml_stop.attrib['lat']),
lng=float(xml_stop.attrib['lng']),
name=xml_stop.attrib['name'])
stops[xml_stop.attrib['id']] = stop
for xml_shuttleGroup in tree.getiterator('shuttleGroup'):
if xml_shuttleGroup.attrib['name'] == 'Test':
continue
r = schedule.AddRoute(short_name="",
long_name=xml_shuttleGroup.attrib['name'], route_type='Bus')
for xml_route in xml_shuttleGroup.getiterator('route'):
t = r.AddTrip(schedule=schedule, headsign=xml_route.attrib['name'],
trip_id=xml_route.attrib['id'])
trip_stops = [] # Build a list of (time, Stop) tuples
for xml_schedule in xml_route.getiterator('schedule'):
trip_stops.append( (int(xml_schedule.attrib['time']) / 1000,
stops[xml_schedule.attrib['stopId']]) )
trip_stops.sort() # Sort by time
for (time, stop) in trip_stops:
t.AddStopTime(stop=stop, arrival_secs=time, departure_secs=time)
schedule.Validate(problems=NoUnusedStopExceptionProblemReporter())
schedule.WriteGoogleTransitFeed(output)
def main():
parser = OptionParser()
parser.add_option('--input', dest='input',
help='Path or URL of input')
parser.add_option('--output', dest='output',
help='Path of output file. Should end in .zip and if it '
'contains the substring YYYYMMDD it will be replaced with '
'today\'s date. It is impossible to include the literal '
'string YYYYYMMDD in the path of the output file.')
parser.add_option('--execute', dest='execute',
help='Commands to run to copy the output. %(path)s is '
'replaced with full path of the output and %(name)s is '
'replaced with name part of the path. Try '
'scp %(path)s myhost:www/%(name)s',
action='append')
parser.set_defaults(input=None, output=None, execute=[])
(options, args) = parser.parse_args()
today = datetime.date.today().strftime('%Y%m%d')
options.output = re.sub(r'YYYYMMDD', today, options.output)
(_, name) = os.path.split(options.output)
path = options.output
SaveFeed(options.input, options.output)
for command in options.execute:
import subprocess
def check_call(cmd):
"""Convenience function that is in the docs for subprocess but not
installed on my system."""
retcode = subprocess.call(cmd, shell=True)
if retcode < 0:
raise Exception("Child '%s' was terminated by signal %d" % (cmd,
-retcode))
elif retcode != 0:
raise Exception("Child '%s' returned %d" % (cmd, retcode))
# path_output and filename_current can be used to run arbitrary commands
check_call(command % locals())
if __name__ == '__main__':
main()