Hi,
Need a authentication page after the login.
Requirement:
Login page - username and password > valid > 2FA page
2FA page - token and qrcode > valid > Home page
The token and shared secret will store in the db if it is existing user else app will generate a shared secret and token.
The shared secret will be call out and use in the QR code every time, but the token will be update once it is expire (30 seconds).
So, the QR code will be unique for each user but will be the same for the same user every time (regardless when scan the QR code).
The token shown in the smartphone will verify with the token stored in the db.
Question:
- Unable to get the user_id / username according to the login’ s user.
And also the automatically update the token to the db once expire. The schedule part add-in the qrcode.py is unable to function.
qrcode.py
def gen_secret ():
shared_secret = pyotp.random_base32()
return shared_secret
def select_id (user_id):
select_query = '''
SELECT user_id, shared_secret
FROM token_table
WHERE user_id = ?
'''
cursor.execute(select_query, (user_id))
result = cursor.fetchone()
return result
def generate_qrcode(service_name, user_id, secret, issuer):
totp_uri = f"otpauth://totp/{service_name}:{user_id}?secret={secret}&issuer={issuer}"
qr_code = pyqrcode.create(totp_uri)
stream = BytesIO()
qr_code.svg(stream, scale=5)
qr_code_svg = stream.getvalue().decode('utf-8')
qr_code_data = "data:image/svg+xml;base64," + base64.b64encode(qr_code_svg.encode()).decode()
return qr_code_data
def update_token (service_name, user_id):
result = select_id (user_id)
user_id = result[0]
existing_shared_secret = result[1]
shared_secret = gen_secret ()
if existing_shared_secret is None:
secret = shared_secret
else:
exe_query = '''
SELECT shared_secret
FROM token_table
WHERE user_id = ?
'''
cursor.execute(exe_query, (user_id))
secret = cursor.fetchone()[0]
totp = pyotp.TOTP(secret)
token = totp.now()
token_gene = datetime.datetime.now()
token_expiration = datetime.datetime.now() + datetime.timedelta(seconds=30)
if existing_shared_secret is None:
update_query = '''
UPDATE token_table
SET token = ? , shared_secret = ? , token_gen = ?, token_expire = ?
WHERE user_id = ?
'''
cursor.execute(update_query, (token, secret, token_gene, token_expiration, user_id))
else:
update_query = '''
UPDATE token_table
SET token = ? ,token_gen = ?, token_expire = ?
WHERE user_id = ?
'''
cursor.execute(update_query, (token, token_gene, token_expiration, user_id))
conn.commit()
issuer = service_name
qr_code_data = generate_qrcode(service_name, user_id, secret, issuer)
return qr_code_data
# Schedule the token update task
schedule.every(15).seconds.do(update_token, service_name, user_id)
# Keep the script running and execute scheduled tasks
while True:
schedule.run_pending()
time.sleep(1)
2FA.py
dash.register_page(__name__,
path='/2fa',
name=' 2FA',
title='2FA',
)
def layout():
if not current_user.is_authenticated:
return html.Div(["Please ", dcc.Link("login", href="/login"), " to continue"])
user_id = current_user.id
return html.Div([
html.H1('Two Factor Authentication Setup', className = ''),
html.P('''
You are almost done! Please start FreeOTP on your smartphone
and scan the following QR Code with it:
''', className = ''),
html.H3('Verification code', className = ''),
dcc.Input( type="text", id="token-box", name='token'
, className = 'login-input'),
html.Div(id="qrcode"),
html.Br(), html.Br(), html.Br(),
html.Button('Generate',n_clicks=0, type="submit", id="generate-button" ,className = ''),
html.Button('Submit',n_clicks=0, type="submit", id = "login-button" ,className = ''),
])
@callback(
Output("qrcode", "children"),
Input("generate-button", "n_clicks"),
prevent_initial_call=True
)
def update_qr_code(n_clicks):
user_id = current_user.id
service_name = "website"
qr_code_data = update_token(service_name, user_id)
if n_clicks > 0:
return html.Img(src=qr_code_data, style={"display": "block"})
else:
return None
- The 2FA page unable to work the verification and access the home page. It will show error message when add-in the @route
server.py
@server.route('/2fa', methods=['GET', 'POST'])
@login_required
def two_factor_authentication():
# if not current_user.is_authenticated:
# return redirect('/login')
user_id = current_user.id
if request.method == 'POST':
entered_token = request.form['token']
cursor.execute(get_token, (user_id,))
stored_token = cursor.fetchone()[0]
if entered_token == stored_token:
# session['verified'] = True # Mark the user as verified
return redirect('/')
# else:
# return "User information not found"
else:
return """Invalid verification code. <a href='/2fa'>Try again</a>"""
return redirect('/2fa')
@server.route('/login', methods=['POST'])
def login_button_click():
if request.form:
username = request.form['username']
password = request.form['password']
if not is_valid_password(password):
return """Password does not meet policy requirements <a href='/login'>login here</a>"""
if VALID_USERNAME_PASSWORD.get(username) is None:
# if VALID_USERNAME_PASSWORD_PAIRS.get(username) is None:
return """Invalid username and/or password <a href='/login'>login here</a>"""
stored_pwd = VALID_USERNAME_PASSWORD.get(username)['password']
hashed_pwd = hashlib.sha256(password.encode()).hexdigest()
if stored_pwd == hashed_pwd:
login_user(User(username))
if 'url' in session:
if session['url']:
url = session['url']
session['url'] = None
return redirect(url) ## redirect to target url
return redirect('/2fa') ## redirect to home
return """Invalid username and/or password <a href='/login'>login here</a>"""
# Login manager object will be used to login / logout users
login_manager = LoginManager()
login_manager.init_app(server)
login_manager.login_view = "/login"
class User(UserMixin):
# User data model. It has to have at least self.id as a minimum
def __init__(self, username):
self.id = username
self.role_id = VALID_USERNAME_PASSWORD[username]['role_id']
# def get_id(self):
# return self.id
# def is_active(self):
# return True # Assuming all users are active
@login_manager.user_loader
def load_user(username):
"""This function loads the user by user id. Typically this looks up the user from a user database.
We won't be registering or looking up users in this example, since we'll just login using LDAP server.
So we'll simply return a User object with the passed in username.
"""
return User(username)