forked from p2pool/p2pool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWSDL.py
137 lines (110 loc) · 4.98 KB
/
WSDL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""Parse web services description language to get SOAP methods.
Rudimentary support."""
ident = '$Id: WSDL.py 1467 2008-05-16 23:32:51Z warnes $'
from version import __version__
import wstools
import xml
from Errors import Error
from Client import SOAPProxy, SOAPAddress
from Config import Config
import urllib
class Proxy:
"""WSDL Proxy.
SOAPProxy wrapper that parses method names, namespaces, soap actions from
the web service description language (WSDL) file passed into the
constructor. The WSDL reference can be passed in as a stream, an url, a
file name, or a string.
Loads info into self.methods, a dictionary with methodname keys and values
of WSDLTools.SOAPCallinfo.
For example,
url = 'http://www.xmethods.org/sd/2001/TemperatureService.wsdl'
wsdl = WSDL.Proxy(url)
print len(wsdl.methods) # 1
print wsdl.methods.keys() # getTemp
See WSDLTools.SOAPCallinfo for more info on each method's attributes.
"""
def __init__(self, wsdlsource, config=Config, **kw ):
reader = wstools.WSDLTools.WSDLReader()
self.wsdl = None
# From Mark Pilgrim's "Dive Into Python" toolkit.py--open anything.
if self.wsdl is None and hasattr(wsdlsource, "read"):
print 'stream:', wsdlsource
try:
self.wsdl = reader.loadFromStream(wsdlsource)
except xml.parsers.expat.ExpatError, e:
newstream = urllib.URLopener(key_file=config.SSL.key_file, cert_file=config.SSL.cert_file).open(wsdlsource)
buf = newstream.readlines()
raise Error, "Unable to parse WSDL file at %s: \n\t%s" % \
(wsdlsource, "\t".join(buf))
# NOT TESTED (as of April 17, 2003)
#if self.wsdl is None and wsdlsource == '-':
# import sys
# self.wsdl = reader.loadFromStream(sys.stdin)
# print 'stdin'
if self.wsdl is None:
try:
file(wsdlsource)
self.wsdl = reader.loadFromFile(wsdlsource)
#print 'file'
except (IOError, OSError): pass
except xml.parsers.expat.ExpatError, e:
newstream = urllib.urlopen(wsdlsource)
buf = newstream.readlines()
raise Error, "Unable to parse WSDL file at %s: \n\t%s" % \
(wsdlsource, "\t".join(buf))
if self.wsdl is None:
try:
stream = urllib.URLopener(key_file=config.SSL.key_file, cert_file=config.SSL.cert_file).open(wsdlsource)
self.wsdl = reader.loadFromStream(stream, wsdlsource)
except (IOError, OSError): pass
except xml.parsers.expat.ExpatError, e:
newstream = urllib.urlopen(wsdlsource)
buf = newstream.readlines()
raise Error, "Unable to parse WSDL file at %s: \n\t%s" % \
(wsdlsource, "\t".join(buf))
if self.wsdl is None:
import StringIO
self.wsdl = reader.loadFromString(str(wsdlsource))
#print 'string'
# Package wsdl info as a dictionary of remote methods, with method name
# as key (based on ServiceProxy.__init__ in ZSI library).
self.methods = {}
service = self.wsdl.services[0]
port = service.ports[0]
name = service.name
binding = port.getBinding()
portType = binding.getPortType()
for operation in portType.operations:
callinfo = wstools.WSDLTools.callInfoFromWSDL(port, operation.name)
self.methods[callinfo.methodName] = callinfo
self.soapproxy = SOAPProxy('http://localhost/dummy.webservice',
config=config, **kw)
def __str__(self):
s = ''
for method in self.methods.values():
s += str(method)
return s
def __getattr__(self, name):
"""Set up environment then let parent class handle call.
Raises AttributeError is method name is not found."""
if not self.methods.has_key(name): raise AttributeError, name
callinfo = self.methods[name]
self.soapproxy.proxy = SOAPAddress(callinfo.location)
self.soapproxy.namespace = callinfo.namespace
self.soapproxy.soapaction = callinfo.soapAction
return self.soapproxy.__getattr__(name)
def show_methods(self):
for key in self.methods.keys():
method = self.methods[key]
print "Method Name:", key.ljust(15)
print
inps = method.inparams
for parm in range(len(inps)):
details = inps[parm]
print " In #%d: %s (%s)" % (parm, details.name, details.type)
print
outps = method.outparams
for parm in range(len(outps)):
details = outps[parm]
print " Out #%d: %s (%s)" % (parm, details.name, details.type)
print