-
Notifications
You must be signed in to change notification settings - Fork 0
/
users.py
198 lines (155 loc) · 6.2 KB
/
users.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from extensions import db
from sqlalchemy.orm import relationship
from utils import gen_uuid, generate_otp, generate_random_string
from datetime import datetime, timedelta
from passlib.hash import pbkdf2_sha256 as hasher
class Users(db.Model):
__tablename__ = 'users'
id = db.Column(db.String(50), primary_key=True, default=gen_uuid)
first_name = db.Column(db.String(100), nullable=False)
last_name = db.Column(db.String(100), nullable=False)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password = db.Column(db.Text, nullable=False)
date_joined = db.Column(db.DateTime, default=datetime.now())
email_verified = db.Column(db.Boolean, default=False)
is_admin = db.Column(db.Boolean, default=False)
is_super_admin = db.Column(db.Boolean, default=False)
organization_id = db.Column(db.String(50), db.ForeignKey('organizations.id'), nullable=True)
projects = relationship('Projects', backref='users')
tasks = relationship('Tasks', backref='assignee', lazy=True)
messages = relationship('Messages', backref='author', lazy=True)
notifications = relationship('Notifications', backref='recipient', lazy=True)
usersession = relationship('UserSession', backref='user', lazy=True, uselist=False)
documents = relationship('Documents', backref='user', lazy=True)
def to_dict(self):
return {
'id': self.id,
'first_name': self.first_name.title(),
'last_name': self.last_name.title(),
'username': self.username.title(),
'email': self.email,
'date_joined': self.date_joined.strftime("%d %b, %Y"),
'email_verified': self.email_verified,
'role': (
"super_admin" if self.is_super_admin
else "admin" if self.is_admin
else "user"
)
}
def save(self):
db.session.add(self)
db.session.commit()
def update(self):
db.session.commit()
def __repr__(self):
return '<Users %r>' % self.username
# otp session
class UserSession(db.Model):
__tablename__ = 'user_session'
id = db.Column(db.String(50), primary_key=True, default=gen_uuid)
user_id = db.Column(db.String(50), db.ForeignKey('users.id'), nullable=False)
reset_p = db.Column(db.String(50), nullable=True)
otp = db.Column(db.String(6), nullable=True)
otp_expiry = db.Column(db.DateTime, nullable=True)
reset_p_expiry = db.Column(db.DateTime, nullable=True)
def save(self):
db.session.add(self)
db.session.commit()
def update(self):
db.session.commit()
# check if email exist and password is correct
def authenticate(email, password):
user = Users.query.filter_by(email=email).first()
if user and hasher.verify(password, user.password):
return user
return None
# validate email
def email_exist(email):
user = Users.query.filter_by(email=email).first()
if user:
return True
return False
# validate username
def username_exist(username):
user = Users.query.filter_by(username=username).first()
if user:
return True
return False
def create_user(first_name, last_name, username, email, password, organization_id, is_admin, is_super_admin):
user = Users(first_name=first_name,
last_name=last_name, username=username,
email=email, password=hasher.hash(password), organization_id=organization_id,
is_admin=is_admin, is_super_admin=is_super_admin)
user.save()
return user
def create_otp(user_id):
# expiry time is 10 minutes
expiry = datetime.now() + timedelta(minutes=10)
otp = generate_otp()
usersession = UserSession.query.filter_by(user_id=user_id).first()
if usersession:
usersession.otp = otp
usersession.otp_expiry = expiry
usersession.update()
else:
usersession = UserSession(user_id=user_id, otp=otp, otp_expiry=expiry)
usersession.save()
return usersession
def create_reset_p(user_id):
# expiry time is 10 minutes
expiry = datetime.now() + timedelta(minutes=10)
reset_p = f"ly{generate_random_string()}"
usersession = UserSession.query.filter_by(user_id=user_id).first()
if usersession:
usersession.reset_p = reset_p
usersession.reset_p_expiry = expiry
usersession.update()
else:
usersession = UserSession(user_id=user_id, reset_p=reset_p, reset_p_expiry=expiry)
usersession.save()
return usersession
def get_user_by_email(email):
return Users.query.filter_by(email=email).first()
def get_user_by_reset_p(reset_p):
usersession = UserSession.query.filter_by(reset_p=reset_p).first()
return usersession
def update_password(user, password):
user.password = hasher.hash(password)
user.update()
return user
def current_user_info(user):
return {
"id": user.id,
"first_name": user.first_name.title(),
"last_name": user.last_name.title(),
"username": user.username,
"email": user.email,
"email_verified": user.email_verified
}
# get all users
def get_all_users(organization_id):
return Users.query.filter_by(organization_id=organization_id).all()
# get users by organization
def get_users_by_organization(organization_id, page, per_page):
users = Users.query.filter_by(organization_id=organization_id).order_by(Users.date_joined.desc()
).paginate(
page=page, per_page=per_page, error_out=False
)
total_items = users.total
total_pages = users.pages
return users, total_items, total_pages
def get_user_by_id(user_id):
return Users.query.filter_by(id=user_id).first()
def update_user_role(user_id, is_admin, is_super_admin):
user = get_user_by_id(user_id)
user.is_admin = is_admin
user.is_super_admin = is_super_admin
user.update()
return user
def change_password(current_user, old_password, new_password):
if not hasher.verify(old_password, current_user.password):
return False
current_user.password = hasher.hash(new_password)
current_user.update()
return True