Skip to content

Commit

Permalink
refactor: move otpauth related code in a dedicated file
Browse files Browse the repository at this point in the history
  • Loading branch information
azmeuk committed Feb 21, 2025
1 parent 1a98e77 commit 618acde
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 75 deletions.
73 changes: 73 additions & 0 deletions canaille/app/otp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Methods related to HOTP and TOTP."""

import secrets

from flask import current_app

HOTP_LOOK_AHEAD_WINDOW = 10


def initialize_otp(user):
user.secret_token = secrets.token_hex(32)
user.last_otp_login = None
if current_app.features.otp_method == "HOTP":
user.hotp_counter = 1


def generate_otp(user, counter_delta=0):
import otpauth

method = current_app.features.otp_method
if method == "TOTP":
totp = otpauth.TOTP(bytes(user.secret_token, "utf-8"))
return totp.string_code(totp.generate())

elif method == "HOTP":
hotp = otpauth.HOTP(bytes(user.secret_token, "utf-8"))
return hotp.string_code(hotp.generate(user.hotp_counter + counter_delta))

else: # pragma: no cover
raise RuntimeError("Invalid one-time password method")


def get_otp_authentication_setup_uri(user):
import otpauth

method = current_app.features.otp_method
if method == "TOTP":
return otpauth.TOTP(bytes(user.secret_token, "utf-8")).to_uri(
label=user.user_name, issuer=current_app.config["CANAILLE"]["NAME"]
)

elif method == "HOTP":
return otpauth.HOTP(bytes(user.secret_token, "utf-8")).to_uri(
label=user.user_name,
issuer=current_app.config["CANAILLE"]["NAME"],
counter=user.hotp_counter,
)

else: # pragma: no cover
raise RuntimeError("Invalid one-time password method")


def is_totp_valid(user, user_otp):
import otpauth

return otpauth.TOTP(bytes(user.secret_token, "utf-8")).verify(user_otp)


def is_hotp_valid(user, user_otp):
import otpauth

counter = user.hotp_counter
is_valid = False
# if user token's counter is ahead of canaille's, try to catch up to it
while counter - user.hotp_counter <= HOTP_LOOK_AHEAD_WINDOW:
is_valid = otpauth.HOTP(bytes(user.secret_token, "utf-8")).verify(
user_otp, counter
)
counter += 1
if is_valid:
user.hotp_counter = counter
return True
return False
4 changes: 3 additions & 1 deletion canaille/backends/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,13 @@ def reset_otp(identifier):
IDENTIFIER should be a user id or user_name
"""
from canaille.app.otp import initialize_otp

user = Backend.instance.get(models.User, identifier)
if not user:
raise click.ClickException(f"No user with id '{identifier}'")

user.initialize_otp()
initialize_otp(user)
current_app.logger.security(
f"Reset one-time password authentication from CLI for {user.user_name}"
)
Expand Down
4 changes: 3 additions & 1 deletion canaille/backends/ldap/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def match_filter(self, filter):

def save(self):
if current_app.features.has_otp and not self.secret_token:
self.initialize_otp()
from canaille.app.otp import initialize_otp

initialize_otp(self)

group_attr = self.python_attribute_to_ldap("groups")
if group_attr not in self.changes:
Expand Down
4 changes: 3 additions & 1 deletion canaille/backends/memory/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ def save(self, instance):
and current_app.features.has_otp
and not instance.secret_token
):
instance.initialize_otp()
from canaille.app.otp import initialize_otp

initialize_otp(instance)

if not instance.id:
instance.id = str(uuid.uuid4())
Expand Down
4 changes: 3 additions & 1 deletion canaille/backends/sql/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ def default_password_arguments(**kwargs):

def save(self):
if current_app.features.has_otp and not self.secret_token:
self.initialize_otp()
from canaille.app.otp import initialize_otp

initialize_otp(self)

@property
def password_failure_timestamps(self):
Expand Down
4 changes: 3 additions & 1 deletion canaille/core/endpoints/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,11 +759,13 @@ def profile_settings(user, edited_user):
return render_template("core/modals/reset-otp.html", edited_user=edited_user)

if request.form.get("action") == "reset-otp" and current_app.features.has_otp:
from canaille.app.otp import initialize_otp

flash(_("One-time password authentication has been reset"), "success")
current_app.logger.security(
f"Reset one-time password authentication for {edited_user.user_name} by {user.user_name}"
)
edited_user.initialize_otp()
initialize_otp(edited_user)
Backend.instance.save(edited_user)

return profile_settings_edit(user, edited_user)
Expand Down
4 changes: 3 additions & 1 deletion canaille/core/endpoints/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ def setup_two_factor_auth():
if not current_app.features.has_otp:
abort(404)

from canaille.app.otp import get_otp_authentication_setup_uri

if current_user():
return redirect(
url_for("core.account.profile_edition", edited_user=current_user())
Expand All @@ -292,7 +294,7 @@ def setup_two_factor_auth():

user = get_user_from_login(session["attempt_login_with_correct_password"])

uri = user.get_otp_authentication_setup_uri()
uri = get_otp_authentication_setup_uri(user)
base64_qr_image = get_b64encoded_qr_image(uri)
return render_template(
"core/setup-mfa.html",
Expand Down
71 changes: 10 additions & 61 deletions canaille/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from canaille.core.mails import send_one_time_password_mail
from canaille.core.sms import send_one_time_password_sms

HOTP_LOOK_AHEAD_WINDOW = 10
OTP_DIGITS = 6
OTP_VALIDITY = 600
SEND_NEW_OTP_DELAY = 10
Expand Down Expand Up @@ -363,25 +362,6 @@ def writable_fields(self):
self._writable_fields |= set(details["WRITE"])
return self._writable_fields

def initialize_otp(self):
self.secret_token = secrets.token_hex(32)
self.last_otp_login = None
if current_app.features.otp_method == "HOTP":
self.hotp_counter = 1

def generate_otp(self, counter_delta=0):
import otpauth

method = current_app.features.otp_method
if method == "TOTP":
totp = otpauth.TOTP(bytes(self.secret_token, "utf-8"))
return totp.string_code(totp.generate())
elif method == "HOTP":
hotp = otpauth.HOTP(bytes(self.secret_token, "utf-8"))
return hotp.string_code(hotp.generate(self.hotp_counter + counter_delta))
else: # pragma: no cover
raise RuntimeError("Invalid one-time password method")

def generate_sms_or_mail_otp(self):
otp = string_code(secrets.randbelow(10**OTP_DIGITS), OTP_DIGITS)
self.one_time_password = otp
Expand All @@ -402,54 +382,23 @@ def generate_and_send_otp_sms(self):
return otp
return False

def get_otp_authentication_setup_uri(self):
import otpauth
def is_otp_valid(self, user_otp, method):
if current_app.features.has_otp and method == "TOTP":
from canaille.app.otp import is_totp_valid

method = current_app.features.otp_method
if method == "TOTP":
return otpauth.TOTP(bytes(self.secret_token, "utf-8")).to_uri(
label=self.user_name, issuer=current_app.config["CANAILLE"]["NAME"]
)
elif method == "HOTP":
return otpauth.HOTP(bytes(self.secret_token, "utf-8")).to_uri(
label=self.user_name,
issuer=current_app.config["CANAILLE"]["NAME"],
counter=self.hotp_counter,
)
else: # pragma: no cover
raise RuntimeError("Invalid one-time password method")
return is_totp_valid(self, user_otp)

elif current_app.features.has_otp and method == "HOTP":
from canaille.app.otp import is_hotp_valid

return is_hotp_valid(self, user_otp)

def is_otp_valid(self, user_otp, method):
if method == "TOTP":
return self.is_totp_valid(user_otp)
elif method == "HOTP":
return self.is_hotp_valid(user_otp)
elif method == "EMAIL_OTP" or method == "SMS_OTP":
return self.is_email_or_sms_otp_valid(user_otp)

else: # pragma: no cover
raise RuntimeError("Invalid one-time password method")

def is_totp_valid(self, user_otp):
import otpauth

return otpauth.TOTP(bytes(self.secret_token, "utf-8")).verify(user_otp)

def is_hotp_valid(self, user_otp):
import otpauth

counter = self.hotp_counter
is_valid = False
# if user token's counter is ahead of canaille's, try to catch up to it
while counter - self.hotp_counter <= HOTP_LOOK_AHEAD_WINDOW:
is_valid = otpauth.HOTP(bytes(self.secret_token, "utf-8")).verify(
user_otp, counter
)
counter += 1
if is_valid:
self.hotp_counter = counter
return True
return False

def is_email_or_sms_otp_valid(self, user_otp):
return user_otp == self.one_time_password and self.is_otp_still_valid()

Expand Down
3 changes: 2 additions & 1 deletion tests/core/test_email_sms_otp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tests.conftest
from canaille.app import mask_email
from canaille.app import mask_phone
from canaille.app.otp import generate_otp
from canaille.core.models import OTP_VALIDITY
from canaille.core.models import SEND_NEW_OTP_DELAY

Expand Down Expand Up @@ -605,7 +606,7 @@ def test_signin_with_multiple_otp_methods(

# TOTP/HOTP
res = testclient.get("/verify-mfa")
res.form["otp"] = user_otp.generate_otp()
res.form["otp"] = generate_otp(user_otp)
res = res.form.submit(status=302).follow(status=200)

# EMAIL_OTP
Expand Down
15 changes: 8 additions & 7 deletions tests/core/test_totp_hotp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import time_machine

from canaille.app import models
from canaille.core.models import HOTP_LOOK_AHEAD_WINDOW
from canaille.app.otp import HOTP_LOOK_AHEAD_WINDOW
from canaille.app.otp import generate_otp


def test_otp_disabled(testclient):
Expand Down Expand Up @@ -46,7 +47,7 @@ def test_signin_and_out_with_otp(testclient, user_otp, caplog, otp_method):
assert "user" == session.get("attempt_login_with_correct_password")

res = testclient.get("/verify-mfa")
res.form["otp"] = user_otp.generate_otp()
res.form["otp"] = generate_otp(user_otp)
res = res.form.submit(status=302)

assert (
Expand Down Expand Up @@ -130,7 +131,7 @@ def test_signin_expired_totp(testclient, user_otp, caplog):
res = res.form.submit(status=302)
res = res.follow(status=200)

res.form["otp"] = user_otp.generate_otp()
res.form["otp"] = generate_otp(user_otp)
traveller.shift(datetime.timedelta(seconds=30))
res = res.form.submit()

Expand Down Expand Up @@ -183,7 +184,7 @@ def test_new_user_setup_otp(testclient, backend, caplog, otp_method):
assert u.secret_token == res.form["secret"].value

res = testclient.get("/verify-mfa", status=200)
res.form["otp"] = u.generate_otp()
res.form["otp"] = generate_otp(u)
res = res.form.submit(status=302)

assert (
Expand Down Expand Up @@ -270,7 +271,7 @@ def test_signin_multiple_attempts_doesnt_desynchronize_hotp(
for _x in range(3):
res.form["otp"] = "111111"
res = res.form.submit(status=302).follow()
res.form["otp"] = user_otp.generate_otp()
res.form["otp"] = generate_otp(user_otp)
res = res.form.submit(status=302)

assert (
Expand Down Expand Up @@ -321,7 +322,7 @@ def test_signin_inside_hotp_look_ahead_window(testclient, backend, user_otp, cap

res = testclient.get("/verify-mfa")

res.form["otp"] = user_otp.generate_otp(HOTP_LOOK_AHEAD_WINDOW)
res.form["otp"] = generate_otp(user_otp, HOTP_LOOK_AHEAD_WINDOW)
res = res.form.submit(status=302)

assert (
Expand Down Expand Up @@ -367,7 +368,7 @@ def test_signin_outside_hotp_look_ahead_window(testclient, backend, user_otp, ca

res = testclient.get("/verify-mfa")

res.form["otp"] = user_otp.generate_otp(HOTP_LOOK_AHEAD_WINDOW + 1)
res.form["otp"] = generate_otp(user_otp, HOTP_LOOK_AHEAD_WINDOW + 1)
res = res.form.submit(status=302)

assert (
Expand Down

0 comments on commit 618acde

Please sign in to comment.