11import hashlib
22import random
33import string
4+
5+ from fastapi import Request
46from fastapi_sqlalchemy import db
5- from fastapi import Request , HTTPException
67from pydantic import validator , constr
8+ from sqlalchemy import func
79from starlette .responses import JSONResponse
810
11+ from auth_backend .base import Base , ResponseModel
912from auth_backend .exceptions import AlreadyExists , AuthFailed , ObjectNotFound , SessionExpired
1013from auth_backend .models .db import AuthMethod
1114from auth_backend .models .db import UserSession , User
1215from auth_backend .settings import get_settings
1316from auth_backend .utils .smtp import send_confirmation_email
1417from .auth_method import AuthMethodMeta , Session
15- from auth_backend .base import Base , ResponseModel
16- from sqlalchemy import func
1718
1819settings = get_settings ()
1920
@@ -24,8 +25,12 @@ class EmailLogin(Base):
2425
2526 @validator ('email' )
2627 def check_email (cls , v ):
28+ restricted : set [str ] = {'"' , '#' , '&' , "'" , '(' , ')' , '*' , ',' , '/' , ';' , '<' , '>' , '?' ,
29+ '[' , '\\ ' , ']' , '^' , '`' , '{' , '|' , '}' , '~' , '\n ' , '\r ' }
2730 if "@" not in v :
2831 raise ValueError ()
32+ if set (v ) & restricted :
33+ raise ValueError ()
2934 return v
3035
3136
@@ -51,12 +56,12 @@ def __init__(self):
5156 async def login (user_inp : EmailLogin ) -> Session :
5257 query = (
5358 db .session .query (AuthMethod )
54- .filter (
59+ .filter (
5560 func .lower (AuthMethod .value ) == user_inp .email .lower (),
5661 AuthMethod .param == "email" ,
5762 AuthMethod .auth_method == Email .get_name (),
5863 )
59- .one_or_none ()
64+ .one_or_none ()
6065 )
6166 if not query :
6267 raise AuthFailed (error = "Incorrect login or password" )
@@ -66,7 +71,7 @@ async def login(user_inp: EmailLogin) -> Session:
6671 error = "Registration wasn't completed. Try to registrate again and do not forget to approve your email"
6772 )
6873 if secrets .get ("email" ).lower () != user_inp .email .lower () or not Email .validate_password (
69- user_inp .password , secrets .get ("hashed_password" ), secrets .get ("salt" )
74+ user_inp .password , secrets .get ("hashed_password" ), secrets .get ("salt" )
7075 ):
7176 raise AuthFailed (error = "Incorrect login or password" )
7277 db .session .add (user_session := UserSession (user_id = query .user .id , token = random_string ()))
@@ -120,22 +125,22 @@ async def _get_user_by_token_and_id(id: int, token: str) -> User | None:
120125 return user
121126
122127 @staticmethod
123- async def register (user_inp : EmailRegister , request : Request ) -> ResponseModel | JSONResponse :
128+ async def register (user_inp : EmailRegister ) -> ResponseModel | JSONResponse :
124129 confirmation_token : str = random_string ()
125130 auth_method : AuthMethod = (
126131 db .session .query (AuthMethod )
127- .filter (
132+ .filter (
128133 AuthMethod .param == "email" ,
129134 func .lower (AuthMethod .value ) == user_inp .email .lower (),
130135 AuthMethod .auth_method == Email .get_name (),
131136 )
132- .one_or_none ()
137+ .one_or_none ()
133138 )
134139 if auth_method :
135140 await Email ._change_confirmation_link (auth_method .user , confirmation_token )
136141 send_confirmation_email (
137142 to_addr = user_inp .email ,
138- link = f"{ request . client . host } /email/approve?token={ confirmation_token } " ,
143+ link = f"{ settings . HOST } /email/approve?token={ confirmation_token } " ,
139144 )
140145 return ResponseModel (status = "Success" , message = "Email confirmation link sent" )
141146 if user_inp .user_id and user_inp .token :
@@ -147,7 +152,7 @@ async def register(user_inp: EmailRegister, request: Request) -> ResponseModel |
147152 await Email ._add_to_db (user_inp , confirmation_token , user )
148153 send_confirmation_email (
149154 to_addr = user_inp .email ,
150- link = f"{ request . client . host } /email/approve?token={ confirmation_token } " ,
155+ link = f"{ settings . HOST } /email/approve?token={ confirmation_token } " ,
151156 )
152157 return JSONResponse (
153158 status_code = 201 , content = ResponseModel (status = "Success" , message = "Email confirmation link sent" ).json ()
@@ -167,23 +172,23 @@ def validate_password(password: str, hashed_password: str, salt: str):
167172 async def approve_email (token : str ) -> object :
168173 auth_method = (
169174 db .session .query (AuthMethod )
170- .filter (
175+ .filter (
171176 AuthMethod .value == token ,
172177 AuthMethod .param == "confirmation_token" ,
173178 AuthMethod .auth_method == Email .get_name (),
174179 )
175- .one_or_none ()
180+ .one_or_none ()
176181 )
177182 if not auth_method :
178183 return JSONResponse (status_code = 403 , content = ResponseModel (status = "Error" , message = "Incorrect link" ).json ())
179184 confirmed = (
180185 db .session .query (AuthMethod )
181- .filter (
186+ .filter (
182187 AuthMethod .auth_method == Email .get_name (),
183188 AuthMethod .param == "confirmed" ,
184189 AuthMethod .user_id == auth_method .user_id ,
185190 )
186- .one ()
191+ .one ()
187192 )
188193 confirmed .value = True
189194 db .session .flush ()
0 commit comments