Source code for lwe.backends.api.user

import hashlib
import datetime

from sqlalchemy.exc import SQLAlchemyError

from lwe.backends.api.orm import Manager, User


[docs] class UserManager(Manager): def _hash_password(self, password): hashed_password = hashlib.sha256(password.encode()).hexdigest() return hashed_password
[docs] def user_found_message(self, user): found = "found" if user else "not found" return f"User {found}."
[docs] def get_by_user_id(self, user_id): try: user = self.orm_get_user(user_id) except SQLAlchemyError as e: return self._handle_error(f"Failed to get user: {str(e)}") return True, user, self.user_found_message(user)
[docs] def get_by_username(self, username): username = username.lower() try: user = self.session.query(User).filter((User.username == username)).first() except SQLAlchemyError as e: return self._handle_error(f"Failed to get user: {str(e)}") return True, user, self.user_found_message(user)
[docs] def get_by_username_or_email(self, identifier): identifier = identifier.lower() try: user = ( self.session.query(User) .filter((User.username == identifier) | (User.email == identifier)) .first() ) except SQLAlchemyError as e: return self._handle_error(f"Failed to get user: {str(e)}") return True, user, self.user_found_message(user)
[docs] def register(self, username, email, password, default_preset="", preferences=None): preferences = preferences or {} username = username.lower() if email: email = email.lower() if password: password = self._hash_password(password) # Check if the username or email is equal to the email of an existing user. if email: try: existing_user = ( self.session.query(User) .filter( (User.username == username) | (User.username == email) | (User.email == email) | (User.email == username) ) .first() ) except SQLAlchemyError as e: return self._handle_error(f"Failed to retrieve existing users: {str(e)}") else: success, existing_user, message = self.get_by_username_or_email(username) if not success: return success, existing_user, message if existing_user: return False, None, "Username or email is already in use." try: user = self.orm_add_user(username, password, email, default_preset, preferences) except SQLAlchemyError as e: return self._handle_error(f"Failed to add user: {str(e)}") return True, user, "User successfully registered."
[docs] def login(self, identifier, password): success, user, message = self.get_by_username_or_email(identifier) if not success: return success, user, message if not user: return False, None, "Username or email not found." # Hash the password and compare it to the hashed password in the database if self._hash_password(password) != user.password: return False, user, "Incorrect password." # Update the last login time user.last_login_time = datetime.datetime.utcnow() try: self.session.commit() self.session.refresh(user) except SQLAlchemyError as e: return self._handle_error(f"Failed to log in user: {str(e)}") return True, user, "Login successful."
[docs] def logout(self, user_id): # Logout functionality is usually implemented on the frontend, so this method can be left blank pass
[docs] def get_users(self, limit=None, offset=None): try: users = self.orm_get_users(limit, offset) except SQLAlchemyError as e: return self._handle_error(f"Failed to get users: {str(e)}") return True, users, "Users retrieved."
[docs] def edit_user(self, user_id, username=None, email=None, password=None, default_preset=None): success, user, message = self.get_by_user_id(user_id) if not success: return success, user, message if not user: return False, None, "User not found." kwargs = {} # Check if the new username or email is equal any existing user username or email. if username: success, existing_user, message = self.get_by_username_or_email(username) if not success: return success, existing_user, message if existing_user and existing_user.id != user.id: return False, user, "Username cannot be the same as an existing user's email." kwargs["username"] = username if email: success, existing_user, message = self.get_by_username_or_email(email) if not success: return success, existing_user, message if existing_user and existing_user.id != user.id: return False, user, "Email cannot be the same as an existing user's username." kwargs["email"] = email if password: kwargs["password"] = self._hash_password(password) if default_preset is not None: kwargs["default_preset"] = default_preset try: user = self.orm_edit_user(user, **kwargs) except SQLAlchemyError as e: return self._handle_error(f"Failed to edit user: {str(e)}") return True, user, "User successfully edited."
[docs] def delete_user(self, user_id): success, user, message = self.get_by_user_id(user_id) if not success: return success, user, message if not user: return False, None, "User not found." try: user = self.orm_delete_user(user) except SQLAlchemyError as e: return self._handle_error(f"Failed to delete user: {str(e)}") return True, user, "User successfully deleted."