-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabaseManager.py
executable file
·173 lines (158 loc) · 5.77 KB
/
databaseManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# -*- coding: utf-8 -*-
# ===============================================================================
#
# Authors: Massimiliano Cannata, Milan Antonovic
#
# Copyright (c) 2015 IST-SUPSI (www.supsi.ch/ist)
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# ===============================================================================
import psycopg2 #TODO: the right library
import psycopg2.extras
import psycopg2.extensions
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
import sys
from walib import utils as wut
#import pprint
#pp = pprint.PrettyPrinter(indent=4)
class Database:
"""Connect to a database"""
user = None
password = None
host = None
dbName = None
port = None
def getConnection(self):
"""Return a database connection"""
return None;
def closeConnection(self):
"""Close a database connection"""
return None
class PgDB(Database):
"""Connect to a PostgreSQL database"""
host=None
def __init__(self,user,password,dbName,host='localhost',port='5433'):
"Initialize PostgreSQL connection parameters"
self.__dns=""
if host: self.__dns += "host='%s' " % host
if port: self.__dns += "port='%d' " % int(port)
if dbName: self.__dns += "dbname='%s' " % dbName
if user: self.__dns += "user='%s' " % user
if password: self.__dns += "password='%s' " % password
self.__connect()
def __connect(self):
"""Connect to a PostgreSQL database"""
try:
self.__conn=psycopg2.connect(self.__dns)
self.__conn.set_client_encoding('UTF8')
except Exception as e:
emes = "%s" % e
if emes.find("CONNECTION ERROR: wrong password")>-1:
raise Exception("CONNECTION ERROR: wrong password or user")
elif emes.find("could not translate host")>-1:
raise Exception("CONNECTION ERROR: wrong host name")
elif emes.find("database")>-1:
raise Exception("CONNECTION ERROR: wrong database")
elif emes.find("connections on port")>-1 or emes.find("invalid literal for int()")>-1:
raise Exception("CONNECTION ERROR: wrong port")
else:
raise Exception("CONNECTION ERROR: %s" % e)
def select(self,sql,par=None):
""" Execute a select statement"""
if sql.lstrip()[0:6].lower() == "select":
cur = self.__conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
try:
cur.execute(sql,par)
except psycopg2.ProgrammingError as e:
raise e
try:
rows = cur.fetchall()
except:
rows = None
#self.__conn.commit()
cur.close()
#return rows
return wut.encodeobject(rows)
else:
raise Exception("sql must be a SELECT statement")
def commitTransaction(self):
"""Commit current transaction"""
try:
self.__conn.commit()
except psycopg2.ProgrammingError as e:
raise e
except Exception as e:
raise e
def rollbackTransaction(self):
"""Rollback current transaction"""
try:
self.__conn.rollback()
except psycopg2.ProgrammingError as e:
print >> sys.stderr, e.message
def executeInTransaction(self,sql,par=None):
"""Execute an sql statement in an open session"""
cur = self.__conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
try:
cur.execute(sql,par)
except psycopg2.ProgrammingError as e:
print >> sys.stderr, e.message
self.__conn.rollback()
raise e
except Exception as e:
raise e
try:
rows = cur.fetchall()
except:
rows = None
cur.close()
#return rows
return wut.encodeobject(rows)
def execute(self, sql, par=None):
"""Execute an sql statement"""
cur = self.__conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
try:
cur.execute(sql, par)
except psycopg2.ProgrammingError as e:
raise e
try:
rows = cur.fetchall()
except:
rows = None
self.__conn.commit()
#return rows
return wut.encodeobject(rows)
def insertMany(self,sql,dict):
"""Insert many values at once"""
cur = self.__conn.cursor()
try:
cur.executemany(sql,dict)
except psycopg2.ProgrammingError as e:
raise e
self.__conn.commit()
return
def mogrify(self,sql,par=None):
"""Mogrify an sql statement (print >> sys.stderr, the actual sql query that will be executed)"""
cur = self.__conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
try:
if par:
a = cur.mogrify(sql,par)
else:
a = cur.mogrify(sql)
except psycopg2.ProgrammingError as e:
raise e
cur.close()
return a