Skip to content

Commit aeb92b8

Browse files
aaxelbchrisseto
authored andcommitted
[SHARE-529][SHARE-590][Feature] Use cursor, not offset, in Crossref harvester. (#581)
* Use cursor, not offset, in crossref harvester. * Don't hold all harvested data in memory.
1 parent 80e96e9 commit aeb92b8

3 files changed

Lines changed: 30 additions & 33 deletions

File tree

providers/org/crossref/harvester.py

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,21 @@ def do_harvest(self, start_date, end_date):
1515
start_date.isoformat(),
1616
end_date.isoformat()
1717
),
18-
'rows': 1000
19-
}).url)
20-
21-
def fetch_records(self, url):
22-
resp = self.requests.get(url)
23-
resp.raise_for_status()
24-
total = resp.json()['message']['total-results']
25-
records = resp.json()['message']['items']
26-
27-
# return the first 1000 records
28-
for record in records:
29-
yield (record['DOI'], record)
30-
31-
# make requests for the remaining records
32-
for i in range(1000, total, 1000):
33-
response = self.requests.get(furl(url).add(query_params={
34-
'offset': i
35-
}).url)
36-
37-
response.raise_for_status()
38-
records = response.json()['message']['items']
18+
'rows': 1000,
19+
}))
20+
21+
def fetch_records(self, url: furl):
22+
cursor = '*'
23+
24+
while True:
25+
url.args['cursor'] = cursor
26+
resp = self.requests.get(url.url)
27+
resp.raise_for_status()
28+
message = resp.json()['message']
29+
records = message['items']
30+
cursor = message['next-cursor']
31+
32+
if not records:
33+
break
3934
for record in records:
4035
yield (record['DOI'], record)

share/harvest/harvester.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,17 +122,17 @@ def harvest(self, start_date: [datetime.datetime, datetime.timedelta, pendulum.P
122122
from share.models import RawData
123123
start_date, end_date = self._validate_dates(start_date, end_date)
124124

125-
stored = []
125+
raw_ids = []
126126
with transaction.atomic():
127127
rawdata = self.do_harvest(start_date, end_date, **kwargs)
128128
assert isinstance(rawdata, types.GeneratorType), 'do_harvest did not return a generator type, found {!r}. Make sure to use the yield keyword'.format(type(rawdata))
129129

130130
for doc_id, datum in rawdata:
131-
stored.append(RawData.objects.store_data(doc_id, self.encode_data(datum), self.source, self.config.label))
132-
if limit is not None and len(stored) >= limit:
131+
raw_ids.append(RawData.objects.store_data(doc_id, self.encode_data(datum), self.source, self.config.label).id)
132+
if limit is not None and len(raw_ids) >= limit:
133133
break
134134

135-
return stored
135+
return raw_ids
136136

137137
def raw(self, start_date: [datetime.datetime, datetime.timedelta, pendulum.Pendulum], end_date: [datetime.datetime, datetime.timedelta, pendulum.Pendulum], shift_range: bool=True, limit: int=None, **kwargs) -> list:
138138
start_date, end_date = self._validate_dates(start_date, end_date)

share/tasks.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,18 +137,20 @@ def do_run(self, start: [str, datetime.datetime]=None, end: [str, datetime.datet
137137

138138
try:
139139
logger.info('Starting harvester run for %s %s - %s', self.config.label, start, end)
140-
raws = harvester.harvest(start, end, limit=limit, **kwargs)
141-
logger.info('Collected %d data blobs from %s', len(raws), self.config.label)
140+
raw_ids = harvester.harvest(start, end, limit=limit, **kwargs)
141+
logger.info('Collected %d data blobs from %s', len(raw_ids), self.config.label)
142142
except Exception as e:
143143
logger.exception('Failed harvester task (%s, %s, %s)', self.config.label, start, end)
144144
raise self.retry(countdown=10, exc=e)
145145

146-
for raw in raws:
147-
# attach task
148-
raw.tasks.add(self.task)
149-
150-
task = NormalizerTask().apply_async((self.started_by.id, self.config.label, raw.pk,))
151-
logger.debug('Started normalizer task %s for %s', task, raw.id)
146+
# attach task to each RawData
147+
RawData.tasks.through.objects.bulk_create([
148+
RawData.tasks.through(rawdata_id=raw_id, celeryprovidertask_id=self.task.id)
149+
for raw_id in raw_ids
150+
])
151+
for raw_id in raw_ids:
152+
task = NormalizerTask().apply_async((self.started_by.id, self.config.label, raw_id,))
153+
logger.debug('Started normalizer task %s for %s', task, raw_id)
152154

153155

154156
class NormalizerTask(AppTask):

0 commit comments

Comments
 (0)