Skip to content

Commit 4b27563

Browse files
Tests for #4 issue (#18)
* Fixtures, "forgot password" tests init, tests refactoring * email tests
1 parent 271626d commit 4b27563

4 files changed

Lines changed: 219 additions & 24 deletions

File tree

tests/conftest.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
import datetime
12
from unittest.mock import Mock
23

34
import pytest
45
from fastapi.testclient import TestClient
56
from sqlalchemy import create_engine
67
from sqlalchemy.orm import sessionmaker
8+
from starlette import status
79

810
import auth_backend.auth_plugins.email
11+
from auth_backend.models import AuthMethod, User
912
from auth_backend.routes.base import app
1013
from auth_backend.settings import get_settings
1114

@@ -23,3 +26,47 @@ def dbsession():
2326
engine = create_engine(settings.DB_DSN)
2427
TestingSessionLocal = sessionmaker(autocommit=True, autoflush=False, bind=engine)
2528
return TestingSessionLocal()
29+
30+
31+
@pytest.fixture()
32+
def user_id(client: TestClient, dbsession):
33+
time = datetime.datetime.utcnow()
34+
body = {
35+
"email": f"user{time}@example.com",
36+
"password": "string"
37+
}
38+
client.post("/email/registration", json=body)
39+
db_user: AuthMethod = dbsession.query(AuthMethod).filter(AuthMethod.value == body['email'],
40+
AuthMethod.param == 'email').one()
41+
yield db_user.user_id
42+
for row in dbsession.query(AuthMethod).filter(AuthMethod.user_id == db_user.user_id).all():
43+
dbsession.delete(row)
44+
dbsession.delete(dbsession.query(User).filter(User.id == db_user.user_id).one())
45+
dbsession.flush()
46+
47+
48+
@pytest.fixture()
49+
def user(client: TestClient, dbsession):
50+
url = "/email/login"
51+
time = datetime.datetime.utcnow()
52+
body = {
53+
"email": f"user{time}@example.com",
54+
"password": "string"
55+
}
56+
client.post("/email/registration", json=body)
57+
db_user: AuthMethod = dbsession.query(AuthMethod).filter(AuthMethod.value == body['email'],
58+
AuthMethod.param == 'email').one()
59+
response = client.post(url, json=body)
60+
assert response.status_code == status.HTTP_401_UNAUTHORIZED
61+
token = dbsession.query(AuthMethod).filter(AuthMethod.user_id == db_user.user_id,
62+
AuthMethod.param == "confirmation_token",
63+
AuthMethod.auth_method == "email").one()
64+
response = client.get(f"/email/approve?token={token.value}")
65+
assert response.status_code == status.HTTP_200_OK
66+
response = client.post(url, json=body)
67+
assert response.status_code == status.HTTP_200_OK
68+
yield {"user_id": db_user.user_id, "body": body, "login_json": response.json()}
69+
for row in dbsession.query(AuthMethod).filter(AuthMethod.user_id == db_user.user_id).all():
70+
dbsession.delete(row)
71+
dbsession.delete(dbsession.query(User).filter(User.id == db_user.user_id).one())
72+
dbsession.flush()
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import pytest
2+
from starlette import status
3+
from fastapi.testclient import TestClient
4+
from sqlalchemy.orm import Session
5+
from auth_backend.models.db import AuthMethod, UserSession
6+
7+
8+
url = "/email/reset/email/"
9+
10+
11+
@pytest.mark.skip()
12+
def test_main_scenario(client: TestClient, dbsession: Session, user):
13+
user_id, body, login = user["user_id"], user["body"], user["login_json"]
14+
conf_token_1 = dbsession.query(AuthMethod).filter(AuthMethod.user_id == user_id,
15+
AuthMethod.param == "confirmation_token").one().value
16+
response = client.post(f"{url}{user_id}/request", json={"token": login["token"], "email": "changed@mail.com"})
17+
assert response.status_code == status.HTTP_200_OK
18+
19+
conf_token_2 = dbsession.query(AuthMethod).filter(AuthMethod.user_id == user_id, AuthMethod.param == "confirmation_token").one().value
20+
assert conf_token_2 != conf_token_1
21+
22+
assert not dbsession.query(UserSession).filter(UserSession.token == login["token"]).one().expired
23+
24+
response = client.post(f"/email/login", json=body)
25+
assert response.status_code == status.HTTP_200_OK
26+
27+
response = client.post(f"/email/login", json={"email": "changed@mail.com", "password": body["password"]})
28+
assert response.status_code == status.HTTP_401_UNAUTHORIZED
29+
30+
response = client.get(f"{url}{user_id}?token={conf_token_1}&email=changed@mail.com")
31+
assert response.status_code == status.HTTP_403_FORBIDDEN
32+
33+
response = client.get(f"{url}{user_id}?token={conf_token_2}&email=changed@mail.com")
34+
assert response.status_code == status.HTTP_200_OK
35+
36+
response = client.post(f"/email/login", json=body)
37+
assert response.status_code == status.HTTP_401_UNAUTHORIZED
38+
39+
response = client.post(f"/email/login", json={"email": "changed@mail.com", "password": body["password"]})
40+
assert response.status_code == status.HTTP_200_OK
41+
42+
43+
@pytest.mark.skip()
44+
def test_invalid_jsons(client: TestClient, dbsession: Session, user):
45+
user_id, body, login = user["user_id"], user["body"], user["login_json"]
46+
47+
response = client.post(f"{url}{user_id}/request", json={"token": "", "email": "changed@mail.com"})
48+
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
49+
50+
response = client.post(f"{url}{user_id}/request", json={"token": login["token"], "email": ""})
51+
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
52+
53+
response = client.post(f"{url}{user_id}/request", json={"token": "", "email": ""})
54+
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
55+
56+
57+
@pytest.mark.skip()
58+
def test_expired_token(client: TestClient, dbsession: Session, user):
59+
user_id, body, login = user["user_id"], user["body"], user["login_json"]
60+
response = client.post("/logout", json={"token": login["token"]})
61+
assert response.status_code == status.HTTP_200_OK
62+
63+
response = client.post(f"{url}{user_id}/request", json={"token": login["token"], "email": "changed@mail.com"})
64+
assert response.status_code == status.HTTP_401_UNAUTHORIZED
65+
66+
67+
68+
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import pytest
2+
from starlette import status
3+
from fastapi.testclient import TestClient
4+
from sqlalchemy.orm import Session
5+
from auth_backend.models.db import AuthMethod
6+
7+
8+
url = "/email/reset/password/"
9+
10+
11+
@pytest.mark.skip()
12+
def test_unprocessable_jsons_no_token(client: TestClient, dbsession: Session, user_id: int):
13+
token = dbsession.query(AuthMethod).filter(AuthMethod.user_id == user_id,
14+
AuthMethod.param == "confirmation_token",
15+
AuthMethod.auth_method == "email").one()
16+
response = client.get(f"/email/approve?token={token.value}")
17+
assert response.status_code == status.HTTP_200_OK
18+
19+
response = client.post(f"{url}{user_id}/request")
20+
assert response.status_code == status.HTTP_200_OK
21+
reset_token = dbsession.query(AuthMethod).filter(AuthMethod.auth_method == "email",
22+
AuthMethod.param == "reset_token", AuthMethod.user_id == user_id).one()
23+
assert reset_token
24+
25+
response = client.post(f"{url}{user_id}", json={"reset_token": reset_token, "new_password": ""})
26+
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
27+
28+
response = client.post(f"{url}{user_id}", json={"reset_token": "", "new_password": ""})
29+
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
30+
31+
response = client.post(f"{url}{user_id}", json={"reset_token": "", "new_password": "changedstring3"})
32+
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
33+
34+
35+
@pytest.mark.skip()
36+
def test_unprocessable_jsons_with_token(client: TestClient, dbsession: Session, user):
37+
user_id, body, response = user["user_id"], user["body"], user["login_json"]
38+
auth_token = response["token"]
39+
40+
response = client.post(f"{url}{user_id}/request", json={"token": auth_token, "password": ""})
41+
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
42+
43+
response = client.post(f"{url}{user_id}/request", json={"token": "", "password": ""})
44+
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
45+
46+
response = client.post(f"{url}{user_id}/request", json={"token": "", "password": "string"})
47+
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
48+
49+
response = client.post(f"{url}{user_id}/request", json={"token": auth_token, "password": "string"})
50+
assert response.status_code == status.HTTP_200_OK
51+
52+
53+
@pytest.mark.skip()
54+
def test_no_token(client: TestClient, dbsession: Session, user_id: str):
55+
token = dbsession.query(AuthMethod).filter(AuthMethod.user_id == user_id,
56+
AuthMethod.param == "confirmation_token",
57+
AuthMethod.auth_method == "email").one()
58+
response = client.post(f"{url}{user_id}/request")
59+
assert response.status_code == status.HTTP_403_FORBIDDEN
60+
61+
response = client.get(f"/email/approve?token={token.value}")
62+
assert response.status_code == status.HTTP_200_OK
63+
64+
response = client.post(f"{url}{user_id}/request")
65+
assert response.status_code == status.HTTP_200_OK
66+
reset_token = dbsession.query(AuthMethod).filter(AuthMethod.auth_method == "email", AuthMethod.param == "reset_token", AuthMethod.user_id == user_id).one()
67+
assert reset_token
68+
69+
response = client.post(f"{url}{user_id}", json={"reset_token": reset_token, "new_password": "changedstring"})
70+
assert response.status_code == status.HTTP_200_OK
71+
72+
response = client.post(f"{url}{user_id}", json={"reset_token": reset_token, "new_password": "changedstring2"})
73+
assert response.status_code == status.HTTP_403_FORBIDDEN
74+
75+
76+
@pytest.mark.skip()
77+
def test_with_token(client: TestClient, dbsession: Session, user):
78+
user_id, body, response = user["user_id"], user["body"], user["login_json"]
79+
auth_token = response["token"]
80+
81+
response = client.post(f"{url}{user_id}/request", json={"token": auth_token, "password": "wrong"})
82+
assert response.status_code == status.HTTP_403_FORBIDDEN
83+
84+
response = client.post(f"{url}{user_id}/request", json={"token": auth_token, "password": "string"})
85+
assert response.status_code == status.HTTP_200_OK
86+
reset_token = dbsession.query(AuthMethod).filter(AuthMethod.auth_method == "email",
87+
AuthMethod.param == "reset_token", AuthMethod.user_id == user_id).one()
88+
assert reset_token
89+
90+
response = client.post(f"{url}{user_id}", json={"reset_token": reset_token, "new_password": "changedstring"})
91+
assert response.status_code == status.HTTP_200_OK
92+
93+
response = client.post(f"{url}{user_id}", json={"reset_token": reset_token, "new_password": "changedstring2"})
94+
assert response.status_code == status.HTTP_403_FORBIDDEN
95+
96+
97+
98+
99+
100+

tests/test_routes/test_login.py

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,14 @@ def test_invalid_email(client: TestClient):
1818
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
1919

2020

21-
def test_main_scenario(client: TestClient, dbsession: Session):
22-
time = datetime.datetime.utcnow()
23-
body = {
24-
"email": f"user{time}@example.com",
25-
"password": "string"
26-
}
27-
client.post("/email/registration", json=body)
28-
db_user: AuthMethod = dbsession.query(AuthMethod).filter(AuthMethod.value == body['email'],
29-
AuthMethod.param == 'email').one()
30-
id = db_user.user_id
31-
response = client.post(url, json=body)
32-
assert response.status_code == status.HTTP_401_UNAUTHORIZED
33-
query = dbsession.query(AuthMethod).filter(AuthMethod.auth_method == "email", AuthMethod.param == "email", AuthMethod.value == body["email"]).one()
34-
token = dbsession.query(AuthMethod).filter(AuthMethod.user_id == query.user.id, AuthMethod.param == "confirmation_token", AuthMethod.auth_method =="email").one()
35-
response = client.get(f"/email/approve?token={token.value}")
36-
assert response.status_code == status.HTTP_200_OK
37-
response = client.post(url, json=body)
38-
assert response.status_code == status.HTTP_200_OK
21+
def test_main_scenario(client: TestClient, dbsession: Session, user):
22+
user_id, body, response = user["user_id"], user["body"], user["login_json"]
3923
body_with_uppercase = {
40-
"email": f"User{time}@example.com",
24+
"email": body["email"].replace("u", "U"),
4125
"password": "string"
4226
}
4327
response = client.post(url, json=body_with_uppercase)
4428
assert response.status_code == status.HTTP_200_OK
45-
for row in dbsession.query(AuthMethod).filter(AuthMethod.user_id == id).all():
46-
dbsession.delete(row)
47-
dbsession.delete(dbsession.query(User).filter(User.id == id).one())
48-
dbsession.flush()
4929

5030

5131
def test_incorrect_data(client: TestClient, dbsession: Session):
@@ -65,7 +45,7 @@ def test_incorrect_data(client: TestClient, dbsession: Session):
6545
"email": "wrong@example.com",
6646
"password": "strong"
6747
}
68-
response = client.post("/email/registration", json=body1)
48+
client.post("/email/registration", json=body1)
6949
db_user: AuthMethod = dbsession.query(AuthMethod).filter(AuthMethod.value == body1['email'],
7050
AuthMethod.param == 'email').one()
7151
id = db_user.user_id

0 commit comments

Comments
 (0)