forked from xszyou/Fay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmember_db.py
144 lines (116 loc) · 3.79 KB
/
member_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import sqlite3
import time
import threading
import functools
def synchronized(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
with self.lock:
return func(self, *args, **kwargs)
return wrapper
__member_db = None
def new_instance():
global __member_db
if __member_db is None:
__member_db = Member_Db()
__member_db.init_db()
return __member_db
class Member_Db:
def __init__(self) -> None:
self.lock = threading.Lock()
#初始化
def init_db(self):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS T_Member
(id INTEGER PRIMARY KEY autoincrement,
username TEXT NOT NULL UNIQUE);''')
conn.commit()
conn.close()
# 添加新用户
@synchronized
def add_user(self, username):
if self.is_username_exist(username) == "notexists":
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('INSERT INTO T_Member (username) VALUES (?)', (username,))
conn.commit()
conn.close()
return "success"
else:
return f"Username '{username}' already exists."
# 修改用户名
@synchronized
def update_user(self, username, new_username):
if self.is_username_exist(new_username) == "notexists":
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('UPDATE T_Member SET username = ? WHERE username = ?', (new_username, username))
conn.commit()
conn.close()
return "success"
else:
return f"Username '{new_username}' already exists."
# 删除用户
@synchronized
def delete_user(self, username):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('DELETE FROM T_Member WHERE username = ?', (username,))
conn.commit()
conn.close()
return "success"
# 检查用户名是否已存在
def is_username_exist(self, username):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('SELECT COUNT(*) FROM T_Member WHERE username = ?', (username,))
result = c.fetchone()[0]
conn.close()
if result > 0:
return "exists"
else:
return "notexists"
#根据username查询uid
def find_user(self, username):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('SELECT * FROM T_Member WHERE username = ?', (username,))
result = c.fetchone()
conn.close()
if result is None:
return 0
else:
return result[0]
#根据uid查询username
def find_username_by_uid(self, uid):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('SELECT username FROM T_Member WHERE id = ?', (uid,))
result = c.fetchone()
conn.close()
if result is None:
return 0
else:
return result[0]
@synchronized
def query(self, sql):
try:
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute(sql)
results = c.fetchall()
conn.commit()
conn.close()
return results
except Exception as e:
return f"执行时发生错误:{str(e)}"
# 获取所有用户
@synchronized
def get_all_users(self):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('SELECT * FROM T_Member')
results = c.fetchall()
conn.close()
return results