Skip to content

Commit 1c89a14

Browse files
authored
Merge pull request #12 from heliocastro/credentials_class
Add new SW360OAuth2 class
2 parents bd4e13a + 8d4c3fd commit 1c89a14

3 files changed

Lines changed: 144 additions & 0 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,6 @@ sw360.egg-info/
3131

3232
# only for special tests
3333
tests/sw360/
34+
35+
# To avoid save pyenv .python-version
36+
.python-version

sw360/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@
1111

1212
from .sw360_api import SW360 # noqa: F401
1313
from .sw360error import SW360Error # noqa: F401
14+
from .sw360oauth2 import SW360OAuth2 # noqa: F401

sw360/sw360oauth2.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# -------------------------------------------------------------------------------
2+
# (c) 2022 BMW CarIT GmbH
3+
# All Rights Reserved.
4+
# Authors: helio.chissini-de-castro@bmw.de
5+
#
6+
# Licensed under the MIT license.
7+
# SPDX-License-Identifier: MIT
8+
# -------------------------------------------------------------------------------
9+
10+
from typing import Any, Dict, Optional, Union
11+
from urllib.parse import urljoin
12+
13+
import requests
14+
import urllib3
15+
from requests.auth import HTTPBasicAuth
16+
from requests.structures import CaseInsensitiveDict
17+
18+
from .sw360error import SW360Error
19+
20+
21+
class SW360OAuth2:
22+
"""SW360 OAuth2 Credentials
23+
Restore or create Sw360 oauth2 tokens from user/password if auth server is alive
24+
:param url: URL of the SW360 instance
25+
:param user: SW360 username
26+
:param password: SW360 password
27+
:type url: string
28+
:type user: string
29+
:type password: string
30+
"""
31+
32+
_client_id: str
33+
_client_secret: str
34+
_user: str
35+
_password: str
36+
_token: str
37+
_refresh_token: str
38+
39+
def __init__(self, url: str, user: str, password: str):
40+
self._url, self._user, self._password = url, user, password
41+
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
42+
43+
self.__get_credentials()
44+
45+
def __get_credentials(self) -> None:
46+
"""Return last valid credentials id and secret
47+
48+
Raises:
49+
SW360Error: If is unable to authorize
50+
"""
51+
data: Dict[str, Any]
52+
url = urljoin(self._url, "/authorization/client-management")
53+
auth = HTTPBasicAuth(self._user, self._password)
54+
55+
try:
56+
response = requests.get(url, verify=False, auth=auth)
57+
except Exception as ex:
58+
raise SW360Error(None, url, message="Unable to connect to oauth2 service: " + repr(ex))
59+
60+
data = response.json()[0]
61+
self._client_id = data["client_id"]
62+
self._client_secret = data["client_secret"]
63+
64+
def create_client(self, description: str, writeable: False) -> None:
65+
"""Create an OAuth2 client
66+
67+
Args:
68+
description (str): Some description of the client
69+
writeable (bool): Create the id read/writeable
70+
71+
Raises:
72+
SW360Error: When unable to create a client
73+
"""
74+
scope = ["READ"]
75+
if writeable:
76+
scope.append("WRITE")
77+
payload: Union[str, Any] = {
78+
"description": description,
79+
"authorities": ["BASIC"],
80+
"scope": scope,
81+
"access_token_validity": 3600,
82+
"refresh_token_validity": 3600,
83+
}
84+
auth = HTTPBasicAuth(self._user, self._password)
85+
headers = {"content-type": "application/json"}
86+
url = urljoin(self._url, "/authorization/client-management")
87+
88+
try:
89+
requests.post(url, json=payload, verify=False, auth=auth, headers=headers)
90+
except Exception as ex:
91+
raise SW360Error(None, url, message="Cant create oauth client: " + repr(ex))
92+
93+
def generate_token(self) -> str:
94+
"""Generate a new bearer token
95+
"""
96+
self.__oauth_token(True)
97+
98+
def __token(self, create: bool = False) -> None:
99+
"""Create or return Liferay Token
100+
:param create: If should create or restore active tokens
101+
:type create: bool
102+
"""
103+
data: Dict[str, Any]
104+
url = urljoin(self._url, "/authorization/oauth/token")
105+
payload: Dict[str, Any] = {
106+
"grant_type": "password",
107+
"username": self._user,
108+
"password": self._password,
109+
}
110+
auth = HTTPBasicAuth(self._client_id, self._client_secret)
111+
112+
headers: CaseInsensitiveDict[Any] = CaseInsensitiveDict()
113+
headers["Content-Type"] = "application/x-www-form-urlencoded"
114+
115+
if create:
116+
response = requests.post(url, auth=auth, headers=headers, json=payload, verify=False)
117+
else:
118+
params = f"grant_type=password&username={self._user}&password={self._password}"
119+
response = requests.get(url, auth=auth, headers=headers, params=params, verify=False)
120+
121+
data = response.json()
122+
self._token = data["access_token"]
123+
self._refresh_token = data["refresh_token"]
124+
125+
@property
126+
def token(self) -> Optional[str]:
127+
"""Return the valid oauth token"""
128+
self.__token()
129+
return self._token
130+
131+
@property
132+
def refresh_token(self) -> str:
133+
"""Return the valid oauth refresh token"""
134+
self.__token()
135+
return self._refresh_token
136+
137+
@property
138+
def url(self) -> str:
139+
"""Return current session object url"""
140+
return self._url

0 commit comments

Comments
 (0)