2FA page - Unable to get the login user 's user_id/username in QRcode and verification

Hi,

Need a authentication page after the login.

Requirement:
Login page - username and password > valid > 2FA page
2FA page - token and qrcode > valid > Home page

The token and shared secret will store in the db if it is existing user else app will generate a shared secret and token.
The shared secret will be call out and use in the QR code every time, but the token will be update once it is expire (30 seconds).

So, the QR code will be unique for each user but will be the same for the same user every time (regardless when scan the QR code).

The token shown in the smartphone will verify with the token stored in the db.


Question:

  1. Unable to get the user_id / username according to the login’ s user.
    And also the automatically update the token to the db once expire. The schedule part add-in the qrcode.py is unable to function.

qrcode.py

def gen_secret ():
    shared_secret = pyotp.random_base32()
    return shared_secret


def select_id (user_id):
     select_query = '''
        SELECT user_id, shared_secret 
        FROM token_table 
        WHERE user_id = ?
        '''
    cursor.execute(select_query, (user_id))
    result = cursor.fetchone()
    
    return result



def generate_qrcode(service_name, user_id, secret, issuer):
    totp_uri = f"otpauth://totp/{service_name}:{user_id}?secret={secret}&issuer={issuer}"
    qr_code = pyqrcode.create(totp_uri)   
    stream = BytesIO()
    qr_code.svg(stream, scale=5)
    qr_code_svg = stream.getvalue().decode('utf-8')
    qr_code_data = "data:image/svg+xml;base64," + base64.b64encode(qr_code_svg.encode()).decode()
    return qr_code_data
    
  

def update_token (service_name, user_id):

    result = select_id (user_id)  
    user_id = result[0]
    existing_shared_secret = result[1]
    
    shared_secret = gen_secret ()  
    
    if existing_shared_secret is None:     
        secret = shared_secret
        
    else:
        exe_query = '''
                    SELECT shared_secret 
                    FROM token_table 
                    WHERE user_id = ?
                    '''
        cursor.execute(exe_query, (user_id))
        secret = cursor.fetchone()[0]
           
        
    totp = pyotp.TOTP(secret)
    token = totp.now()   
       
    token_gene = datetime.datetime.now()
    token_expiration = datetime.datetime.now() + datetime.timedelta(seconds=30)
   
        
    if existing_shared_secret is None:                 
        update_query = '''
                        UPDATE token_table 
                        SET token = ? , shared_secret = ? , token_gen = ?, token_expire = ?
                        WHERE user_id = ?
                        '''
        cursor.execute(update_query, (token, secret, token_gene, token_expiration, user_id))
        
        
    else:
        update_query = '''
                        UPDATE token_table 
                        SET token = ? ,token_gen = ?, token_expire = ?
                        WHERE user_id = ?
                        '''
        cursor.execute(update_query, (token, token_gene, token_expiration, user_id))
    
    conn.commit()
    
    issuer = service_name
    qr_code_data = generate_qrcode(service_name, user_id, secret, issuer)
    
    return qr_code_data



# Schedule the token update task
schedule.every(15).seconds.do(update_token, service_name, user_id)

# Keep the script running and execute scheduled tasks
while True:
    schedule.run_pending()
    time.sleep(1)



2FA.py

dash.register_page(__name__,
                   path='/2fa', 
                   name=' 2FA',
                   title='2FA', 

)

def layout():
    if not current_user.is_authenticated:
        return html.Div(["Please ", dcc.Link("login", href="/login"), " to continue"])

    user_id = current_user.id

    return html.Div([
        html.H1('Two Factor Authentication Setup', className = ''),
        
        html.P('''
               You are almost done! Please start FreeOTP on your smartphone
               and scan the following QR Code with it:
                   
                   ''', className = ''),
        
        html.H3('Verification code', className = ''), 
        
        dcc.Input( type="text", id="token-box", name='token'
                  , className = 'login-input'), 
        
        html.Div(id="qrcode"), 
           
        html.Br(), html.Br(), html.Br(), 
        
        html.Button('Generate',n_clicks=0, type="submit", id="generate-button" ,className = ''),
        
        html.Button('Submit',n_clicks=0, type="submit", id = "login-button" ,className = ''),
       
       ])


@callback(
    Output("qrcode", "children"),
    Input("generate-button", "n_clicks"),
    prevent_initial_call=True
)



def update_qr_code(n_clicks):
    user_id = current_user.id
    service_name = "website"
    qr_code_data = update_token(service_name, user_id)
    
    if n_clicks > 0:
        return html.Img(src=qr_code_data, style={"display": "block"})

        
    else:
        return None





  1. The 2FA page unable to work the verification and access the home page. It will show error message when add-in the @route

server.py

@server.route('/2fa', methods=['GET', 'POST'])
@login_required
def two_factor_authentication():
    # if not current_user.is_authenticated:
    #     return redirect('/login')
        
    user_id = current_user.id
    
    if request.method == 'POST':
        entered_token = request.form['token']
        cursor.execute(get_token, (user_id,))
        stored_token = cursor.fetchone()[0] 

        if entered_token == stored_token:
            # session['verified'] = True  # Mark the user as verified
            return redirect('/') 
        # else:
        #     return "User information not found"

        else:
            return """Invalid verification code. <a href='/2fa'>Try again</a>"""

    return redirect('/2fa')




@server.route('/login', methods=['POST'])
def login_button_click():
    if request.form:
        username = request.form['username']
        password = request.form['password']
        
        if not is_valid_password(password):
            return """Password does not meet policy requirements <a href='/login'>login here</a>"""
        

        if VALID_USERNAME_PASSWORD.get(username) is None:
        # if VALID_USERNAME_PASSWORD_PAIRS.get(username) is None:
   
            return """Invalid username and/or password <a href='/login'>login here</a>"""
        
        stored_pwd = VALID_USERNAME_PASSWORD.get(username)['password']
        hashed_pwd = hashlib.sha256(password.encode()).hexdigest()

            
        if stored_pwd == hashed_pwd:
            login_user(User(username))
            if 'url' in session:
                if session['url']:
                    url = session['url']
                    session['url'] = None
                    return redirect(url) ## redirect to target url
            return redirect('/2fa') ## redirect to home
        
        
        return """Invalid username and/or password <a href='/login'>login here</a>"""



# Login manager object will be used to login / logout users
login_manager = LoginManager()
login_manager.init_app(server)
login_manager.login_view = "/login"


class User(UserMixin):
    # User data model. It has to have at least self.id as a minimum
    def __init__(self, username):
        self.id = username
        self.role_id = VALID_USERNAME_PASSWORD[username]['role_id']

    # def get_id(self):
    #     return self.id
    
    # def is_active(self):
    #     return True  # Assuming all users are active




@login_manager.user_loader
def load_user(username):
    """This function loads the user by user id. Typically this looks up the user from a user database.
    We won't be registering or looking up users in this example, since we'll just login using LDAP server.
    So we'll simply return a User object with the passed in username.
    """
    return User(username)


Hello @beginof,

Just looking through, I think you are trying to do too much with the Totp, you don’t need to update it every time it expires on the server.

You just need to query the saved secret key from your db and then compare the totp.now() to what the user provides to what your system says, if matched, then authenticate the user.

If you have a sample flow of all the pieces working together, it would make testing what you have easier.

Hi @jinnyzor,

Is it mean that I just need to store the totp.now() in the db based on the stored shared secret?

The update_token function, I don’t think you need it.

You are only going to have 2FA at certain times in the application, correct?

When user login the app, after the valid username and password.

It will be use all the time when user login.

But you only need it once, unless you have the user constantly entering the totp this doesn’t make sense to constantly update.

Even then you shouldn’t need to store the actual onetime password, but just the secret key.

Could you please provide an example flow for this so I can see how it all works together?

The flow will be some kind like this:

Ok.

Can you make a basic dash app with that flow and use the functions and things that you are suggesting?

Just use a SQLite db for the time being.

My current flow is like login > valid > home

but due to security issue, need to add a 2fa after valid username and password.

When add 2fa page, it is no work as expected.

If the OTP is no store in database, how can the input token verify with the generated token from the qrcode?

Can you post your code or give a repo so that we dont have to build this all from scratch?