Skip to content
This repository was archived by the owner on Mar 14, 2024. It is now read-only.

Commit 5e75d70

Browse files
lwfacebook-github-bot
authored andcommitted
Make edge importing more efficient
Summary: As we introduced support for buffered streaming edge appending, we can make use of it to avoid storing all the edges in memory at the same time. This also showcases how to use that streaming API, so that other users can get inspiration from it for their own importers. Also, increase chunk sizes in HDF5 files from 1MiB to 50MiB. I used 1MiB because it was suggested in the h5py doc but I now suspect that it was a recommendation if one was doing random access to the dataset. This is not our case, so we can use larger chunks, which make writing more efficient. Reviewed By: adamlerer Differential Revision: D17525462 fbshipit-source-id: 646e6f733b3262128a17c3ab3372acbe27c8c6a0
1 parent 50c9038 commit 5e75d70

2 files changed

Lines changed: 15 additions & 17 deletions

File tree

torchbiggraph/converters/import_from_tsv.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88

99
import argparse
1010
import random
11+
from contextlib import ExitStack
1112
from itertools import chain
1213
from pathlib import Path
13-
from typing import Any, Counter, DefaultDict, Dict, List, Optional, Tuple
14+
from typing import Any, Counter, Dict, List, Optional, Tuple
1415

1516
import torch
1617

@@ -25,6 +26,7 @@
2526
from torchbiggraph.edgelist import EdgeList
2627
from torchbiggraph.entitylist import EntityList
2728
from torchbiggraph.graph_storages import (
29+
AbstractEdgeAppender,
2830
AbstractEdgeStorage,
2931
AbstractEntityStorage,
3032
AbstractRelationTypeStorage,
@@ -181,12 +183,12 @@ def generate_edge_path_files(
181183

182184
print(f"- Edges will be partitioned in {num_lhs_parts} x {num_rhs_parts} buckets.")
183185

184-
buckets: DefaultDict[Tuple[int, int], List[Tuple[int, int, int]]] = \
185-
DefaultDict(list)
186186
processed = 0
187187
skipped = 0
188188

189-
with edge_file_in.open("rt") as tf:
189+
# We use an ExitStack in order to close the dynamically-created edge appenders.
190+
with edge_file_in.open("rt") as tf, ExitStack() as appender_stack:
191+
appenders: Dict[Tuple[int, int], AbstractEdgeAppender] = {}
190192
for line_num, line in enumerate(tf, start=1):
191193
words = line.split()
192194
try:
@@ -225,7 +227,14 @@ def generate_edge_path_files(
225227
skipped += 1
226228
continue
227229

228-
buckets[lhs_part, rhs_part].append((lhs_offset, rhs_offset, rel_id))
230+
if (lhs_part, rhs_part) not in appenders:
231+
appenders[lhs_part, rhs_part] = appender_stack.enter_context(
232+
edge_storage.save_edges_by_appending(lhs_part, rhs_part))
233+
appenders[lhs_part, rhs_part].append_edges(EdgeList(
234+
EntityList.from_tensor(torch.tensor([lhs_offset], dtype=torch.long)),
235+
EntityList.from_tensor(torch.tensor([rhs_offset], dtype=torch.long)),
236+
torch.tensor([rel_id], dtype=torch.long),
237+
))
229238

230239
processed = processed + 1
231240
if processed % 100000 == 0:
@@ -237,17 +246,6 @@ def generate_edge_path_files(
237246
f"entities were unknown (either not given in the config or "
238247
f"filtered out as too rare).")
239248

240-
for i in range(num_lhs_parts):
241-
for j in range(num_rhs_parts):
242-
print(f"- Writing bucket ({i}, {j}), "
243-
f"containing {len(buckets[i, j])} edges...")
244-
edges = torch.tensor(buckets[i, j], dtype=torch.long).view((-1, 3))
245-
edge_storage.save_edges(i, j, EdgeList(
246-
EntityList.from_tensor(edges[:, 0]),
247-
EntityList.from_tensor(edges[:, 1]),
248-
edges[:, 2],
249-
))
250-
251249

252250
def convert_input_data(
253251
entity_configs: Dict[str, EntitySchema],

torchbiggraph/graph_storages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ def torch_to_numpy_dtype(dtype):
262262
class BufferedDataset:
263263

264264
DATA_TYPE = torch.long # int64, 8 bytes
265-
BUFFER_SIZE = 2 ** 20 // 8 # 1MiB
265+
BUFFER_SIZE = 50 * 2 ** 20 // 8 # 50MiB
266266

267267
def __init__(self, hf: h5py.File, dataset_name: str) -> None:
268268
self.hf: h5py.File = hf

0 commit comments

Comments
 (0)