|
| 1 | +import contextlib |
| 2 | +import re |
| 3 | +from collections.abc import AsyncGenerator, Iterable |
| 4 | +from http import HTTPStatus |
| 5 | + |
| 6 | +import httpx |
| 7 | +import pytest |
| 8 | +from sqlalchemy import text |
| 9 | +from sqlalchemy.ext.asyncio import AsyncConnection |
| 10 | + |
| 11 | +from tests.users import OWNER_USER, ApiKey |
| 12 | + |
| 13 | + |
| 14 | +@pytest.mark.parametrize( |
| 15 | + "api_key", |
| 16 | + [ApiKey.ADMIN, ApiKey.SOME_USER, ApiKey.OWNER_USER], |
| 17 | + ids=["Administrator", "non-owner", "tag owner"], |
| 18 | +) |
| 19 | +@pytest.mark.parametrize( |
| 20 | + "other_tags", |
| 21 | + [[], ["some_other_tag"], ["foo_some_other_tag", "bar_some_other_tag"]], |
| 22 | + ids=["none", "one tag", "two tags"], |
| 23 | +) |
| 24 | +async def test_setup_untag_response_is_identical_when_tag_exists( |
| 25 | + api_key: str, |
| 26 | + other_tags: list[str], |
| 27 | + py_api: httpx.AsyncClient, |
| 28 | + php_api: httpx.AsyncClient, |
| 29 | + expdb_test: AsyncConnection, |
| 30 | +) -> None: |
| 31 | + setup_id = 1 |
| 32 | + tag = "totally_new_tag_for_migration_testing" |
| 33 | + |
| 34 | + @contextlib.asynccontextmanager |
| 35 | + async def temporary_tags( |
| 36 | + tags: Iterable[str], setup_id: int, *, persist: bool = False |
| 37 | + ) -> AsyncGenerator[None]: |
| 38 | + for tag in tags: |
| 39 | + await expdb_test.execute( |
| 40 | + text( |
| 41 | + "INSERT INTO setup_tag(`id`,`tag`,`uploader`) VALUES (:setup_id, :tag, :user_id);" # noqa: E501 |
| 42 | + ), |
| 43 | + parameters={"setup_id": setup_id, "tag": tag, "user_id": OWNER_USER.user_id}, |
| 44 | + ) |
| 45 | + if persist: |
| 46 | + await expdb_test.commit() |
| 47 | + yield |
| 48 | + for tag in tags: |
| 49 | + await expdb_test.execute( |
| 50 | + text("DELETE FROM setup_tag WHERE `id`=:setup_id AND `tag`=:tag"), |
| 51 | + parameters={"setup_id": setup_id, "tag": tag}, |
| 52 | + ) |
| 53 | + if persist: |
| 54 | + await expdb_test.commit() |
| 55 | + |
| 56 | + all_tags = [tag, *other_tags] |
| 57 | + async with temporary_tags(tags=all_tags, setup_id=setup_id, persist=True): |
| 58 | + original = await php_api.post( |
| 59 | + "/setup/untag", |
| 60 | + data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, |
| 61 | + ) |
| 62 | + |
| 63 | + # expdb_test transaction shared with Python API, |
| 64 | + # no commit needed and rolled back at the end of the test |
| 65 | + async with temporary_tags(tags=all_tags, setup_id=setup_id): |
| 66 | + new = await py_api.post( |
| 67 | + f"/setup/untag?api_key={api_key}", |
| 68 | + json={"setup_id": setup_id, "tag": tag}, |
| 69 | + ) |
| 70 | + |
| 71 | + if new.status_code == HTTPStatus.OK: |
| 72 | + assert original.status_code == new.status_code |
| 73 | + original_untag = original.json()["setup_untag"] |
| 74 | + new_untag = new.json()["setup_untag"] |
| 75 | + assert original_untag["id"] == new_untag["id"] |
| 76 | + if tags := original_untag.get("tag"): |
| 77 | + if isinstance(tags, str): |
| 78 | + assert tags == new_untag["tag"][0] |
| 79 | + else: |
| 80 | + assert tags == new_untag["tag"] |
| 81 | + else: |
| 82 | + assert new_untag["tag"] == [] |
| 83 | + return |
| 84 | + |
| 85 | + code, message = original.json()["error"].values() |
| 86 | + assert original.status_code == HTTPStatus.PRECONDITION_FAILED |
| 87 | + assert new.status_code == HTTPStatus.FORBIDDEN |
| 88 | + assert code == new.json()["code"] |
| 89 | + assert message == "Tag is not owned by you" |
| 90 | + assert re.match( |
| 91 | + r"You may not remove tag \S+ of setup \d+ because it was not created by you.", |
| 92 | + new.json()["detail"], |
| 93 | + ) |
| 94 | + |
| 95 | + |
| 96 | +async def test_setup_untag_response_is_identical_setup_doesnt_exist( |
| 97 | + py_api: httpx.AsyncClient, |
| 98 | + php_api: httpx.AsyncClient, |
| 99 | +) -> None: |
| 100 | + setup_id = 999999 |
| 101 | + tag = "totally_new_tag_for_migration_testing" |
| 102 | + api_key = ApiKey.SOME_USER |
| 103 | + |
| 104 | + original = await php_api.post( |
| 105 | + "/setup/untag", |
| 106 | + data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, |
| 107 | + ) |
| 108 | + |
| 109 | + new = await py_api.post( |
| 110 | + f"/setup/untag?api_key={api_key}", |
| 111 | + json={"setup_id": setup_id, "tag": tag}, |
| 112 | + ) |
| 113 | + |
| 114 | + assert original.status_code == HTTPStatus.PRECONDITION_FAILED |
| 115 | + assert new.status_code == HTTPStatus.NOT_FOUND |
| 116 | + assert original.json()["error"]["message"] == "Entity not found." |
| 117 | + assert original.json()["error"]["code"] == new.json()["code"] |
| 118 | + assert re.match( |
| 119 | + r"Setup \d+ not found.", |
| 120 | + new.json()["detail"], |
| 121 | + ) |
| 122 | + |
| 123 | + |
| 124 | +async def test_setup_untag_response_is_identical_tag_doesnt_exist( |
| 125 | + py_api: httpx.AsyncClient, |
| 126 | + php_api: httpx.AsyncClient, |
| 127 | +) -> None: |
| 128 | + setup_id = 1 |
| 129 | + tag = "totally_new_tag_for_migration_testing" |
| 130 | + api_key = ApiKey.SOME_USER |
| 131 | + |
| 132 | + original = await php_api.post( |
| 133 | + "/setup/untag", |
| 134 | + data={"api_key": api_key, "tag": tag, "setup_id": setup_id}, |
| 135 | + ) |
| 136 | + |
| 137 | + new = await py_api.post( |
| 138 | + f"/setup/untag?api_key={api_key}", |
| 139 | + json={"setup_id": setup_id, "tag": tag}, |
| 140 | + ) |
| 141 | + |
| 142 | + assert original.status_code == HTTPStatus.PRECONDITION_FAILED |
| 143 | + assert new.status_code == HTTPStatus.NOT_FOUND |
| 144 | + assert original.json()["error"]["code"] == new.json()["code"] |
| 145 | + assert original.json()["error"]["message"] == "Tag not found." |
| 146 | + assert re.match( |
| 147 | + r"Setup \d+ does not have tag '\S+'.", |
| 148 | + new.json()["detail"], |
| 149 | + ) |
0 commit comments