-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathclient.py
More file actions
225 lines (187 loc) · 7.29 KB
/
client.py
File metadata and controls
225 lines (187 loc) · 7.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
"""
Asyncio client for Zyte Data API
"""
import asyncio
import time
from base64 import b64decode
from collections.abc import Mapping
from functools import partial
from typing import Awaitable, Iterator, List, Optional
import aiohttp
from aiohttp import TCPConnector
from tenacity import AsyncRetrying
from .errors import RequestError
from .retry import zyte_api_retrying
from ..apikey import get_apikey
from ..constants import API_URL, API_TIMEOUT
from ..stats import AggStats, ResponseStats
from ..utils import _to_lower_camel_case, user_agent
# 120 seconds is probably too long, but we are concerned about the case with
# many concurrent requests and some processing logic running in the same reactor,
# thus, saturating the CPU. This will make timeouts more likely.
AIO_API_TIMEOUT = aiohttp.ClientTimeout(total=API_TIMEOUT + 120)
def create_session(connection_pool_size=100, **kwargs) -> aiohttp.ClientSession:
""" Create a session with parameters suited for Zyte API """
kwargs.setdefault('timeout', AIO_API_TIMEOUT)
if "connector" not in kwargs:
kwargs["connector"] = TCPConnector(limit=connection_pool_size)
return aiohttp.ClientSession(**kwargs)
def _post_func(session):
""" Return a function to send a POST request """
if session is None:
return partial(aiohttp.request,
method='POST',
timeout=AIO_API_TIMEOUT)
else:
return session.post
class ExtractResult(Mapping):
"""Result of a call to AsyncClient.extract.
It can be used as a dictionary to access the raw API response.
It also provides some helper properties for easier access to some of its
underlying data.
"""
def __init__(self, api_response: dict):
self._api_response = api_response
def __getitem__(self, key):
return self._api_response[key]
def __iter__(self):
yield from self._api_response
def __len__(self):
return len(self._api_response)
@property
def http_response_body(self): -> bytes:
if hasattr(self, "_http_response_body"):
return self._http_response_body
base64_body = self._api_response.get("httpResponseBody", None)
if base64_body is None:
raise ValueError("API response has no httpResponseBody key.")
self._http_response_body = b64decode(base64_body)
return self._http_response_body
class AsyncClient:
def __init__(self, *,
api_key=None,
api_url=API_URL,
n_conn=15,
):
self.api_key = get_apikey(api_key)
self.api_url = api_url
self.n_conn = n_conn
self.agg_stats = AggStats()
async def request_raw(self, query: dict, *,
endpoint: str = 'extract',
session=None,
handle_retries=True,
retrying: Optional[AsyncRetrying] = None,
):
retrying = retrying or zyte_api_retrying
post = _post_func(session)
auth = aiohttp.BasicAuth(self.api_key)
headers = {'User-Agent': user_agent(aiohttp)}
response_stats = []
start_global = time.perf_counter()
async def request():
stats = ResponseStats.create(start_global)
self.agg_stats.n_attempts += 1
post_kwargs = dict(
url=self.api_url + endpoint,
json=query,
auth=auth,
headers=headers,
)
try:
async with post(**post_kwargs) as resp:
stats.record_connected(resp.status, self.agg_stats)
if resp.status >= 400:
content = await resp.read()
resp.release()
stats.record_read()
stats.record_request_error(content, self.agg_stats)
raise RequestError(
request_info=resp.request_info,
history=resp.history,
status=resp.status,
message=resp.reason,
headers=resp.headers,
response_content=content
)
response = await resp.json()
stats.record_read(self.agg_stats)
return response
except Exception as e:
if not isinstance(e, RequestError):
self.agg_stats.n_errors += 1
stats.record_exception(e, agg_stats=self.agg_stats)
raise
finally:
response_stats.append(stats)
if handle_retries:
request = retrying.wraps(request)
try:
# Try to make a request
result = await request()
self.agg_stats.n_extracted_queries += 1
except Exception:
self.agg_stats.n_fatal_errors += 1
raise
finally:
self.agg_stats.n_input_queries += 1
self.agg_stats.n_results += 1
return result
def request_parallel_as_completed(self,
queries: List[dict],
*,
endpoint: str = 'extract',
session: Optional[aiohttp.ClientSession] = None,
) -> Iterator[asyncio.Future]:
""" Send multiple requests to Zyte Data API in parallel.
Return an `asyncio.as_completed` iterator.
``queries`` is a list of requests to process (dicts).
``session`` is an optional aiohttp.ClientSession object;
use it to enable HTTP Keep-Alive. Set the session TCPConnector
limit to a value greater than the number of connections.
"""
sem = asyncio.Semaphore(self.n_conn)
async def _request(query):
async with sem:
return await self.request_raw(query,
endpoint=endpoint,
session=session)
return asyncio.as_completed([_request(query) for query in queries])
@staticmethod
def _build_extract_query(raw_query):
return {
_to_lower_camel_case(k): v
for k, v in raw_query.items()
}
async def extract(
self,
url: str,
*,
session: Optional[aiohttp.ClientSession] = None,
handle_retries: bool = True,
retrying: Optional[AsyncRetrying] = None,
**kwargs,
) -> Awaitable[ExtractResult]:
"""…"""
query = self._build_extract_query({**kwargs, 'url'=url})
response = await self.request_raw(
query=query,
endpoint='extract',
session=session,
handle_retries=handle_retries,
retrying=retrying,
)
return ExtractResult(response)
def extract_in_parallel(
self,
queries: List[dict],
*,
session: Optional[aiohttp.ClientSession] = None,
) -> Iterator[asyncio.Future]:
"""…"""
queries = [self._build_extract_query(query) for query in queries]
return self.request_parallel_as_completed(
queries,
endpoint='extract',
session=session,
)