Skip to content

Commit 5312aa1

Browse files
rmdmattinglyhgromerHernan Gelaf-Romer
authored
HBASE-28440 Add support for using mapreduce sort in HFileOutputFormat2 (#7295) (#7342)
Signed-off-by: Ray Mattingly <rmattingly@apache.org> Co-authored-by: Hernan Romer <nanug33@gmail.com> Co-authored-by: Hernan Gelaf-Romer <hgelafromer@hubspot.com>
1 parent e7154e1 commit 5312aa1

9 files changed

Lines changed: 373 additions & 58 deletions

File tree

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
335335
}
336336

337337
protected void incrementalCopyHFiles(String[] files, String backupDest) throws IOException {
338+
boolean diskBasedSortingOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf);
338339
try {
339340
LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest);
340341
// set overall backup phase: incremental_copy
@@ -349,6 +350,7 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
349350
LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
350351
}
351352
conf.set(JOB_NAME_CONF_KEY, jobname);
353+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
352354

353355
BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
354356
int res = copyService.copy(backupInfo, backupManager, conf, BackupType.INCREMENTAL, strArr);
@@ -361,6 +363,8 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
361363
+ " finished.");
362364
} finally {
363365
deleteBulkLoadDirectory();
366+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
367+
diskBasedSortingOriginalValue);
364368
}
365369
}
366370

@@ -415,6 +419,9 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
415419
conf.setBoolean(HFileOutputFormat2.TABLE_NAME_WITH_NAMESPACE_INCLUSIVE_KEY, true);
416420
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
417421
conf.set(JOB_NAME_CONF_KEY, jobname);
422+
423+
boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2.diskBasedSortingEnabled(conf);
424+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
418425
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
419426

420427
try {
@@ -430,6 +437,11 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws
430437
} catch (Exception ee) {
431438
throw new IOException("Can not convert from directory " + dirs
432439
+ " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
440+
} finally {
441+
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
442+
diskBasedSortingEnabledOriginalValue);
443+
conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
444+
conf.unset(JOB_NAME_CONF_KEY);
433445
}
434446
}
435447

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.fs.Path;
2424
import org.apache.hadoop.hbase.Cell;
2525
import org.apache.hadoop.hbase.CellUtil;
26+
import org.apache.hadoop.hbase.ExtendedCell;
2627
import org.apache.hadoop.hbase.HBaseConfiguration;
2728
import org.apache.hadoop.hbase.TableName;
2829
import org.apache.hadoop.hbase.client.Connection;
@@ -33,11 +34,14 @@
3334
import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
3435
import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
3536
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
37+
import org.apache.hadoop.hbase.mapreduce.KeyOnlyCellComparable;
38+
import org.apache.hadoop.hbase.mapreduce.PreSortedCellsReducer;
3639
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
3740
import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
3841
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
3942
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
4043
import org.apache.hadoop.io.NullWritable;
44+
import org.apache.hadoop.io.WritableComparable;
4145
import org.apache.hadoop.mapreduce.Job;
4246
import org.apache.hadoop.mapreduce.Mapper;
4347
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -71,18 +75,28 @@ protected MapReduceHFileSplitterJob(final Configuration c) {
7175
/**
7276
* A mapper that just writes out cells. This one can be used together with {@link CellSortReducer}
7377
*/
74-
static class HFileCellMapper extends Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
78+
static class HFileCellMapper extends Mapper<NullWritable, Cell, WritableComparable<?>, Cell> {
79+
80+
private boolean diskBasedSortingEnabled = false;
7581

7682
@Override
7783
public void map(NullWritable key, Cell value, Context context)
7884
throws IOException, InterruptedException {
79-
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
80-
new MapReduceExtendedCell(value));
85+
ExtendedCell extendedCell = (ExtendedCell) value;
86+
context.write(wrap(extendedCell), new MapReduceExtendedCell(extendedCell));
8187
}
8288

8389
@Override
8490
public void setup(Context context) throws IOException {
85-
// do nothing
91+
diskBasedSortingEnabled =
92+
HFileOutputFormat2.diskBasedSortingEnabled(context.getConfiguration());
93+
}
94+
95+
private WritableComparable<?> wrap(ExtendedCell cell) {
96+
if (diskBasedSortingEnabled) {
97+
return new KeyOnlyCellComparable(cell);
98+
}
99+
return new ImmutableBytesWritable(CellUtil.cloneRow(cell));
86100
}
87101
}
88102

@@ -106,13 +120,23 @@ public Job createSubmittableJob(String[] args) throws IOException {
106120
true);
107121
job.setJarByClass(MapReduceHFileSplitterJob.class);
108122
job.setInputFormatClass(HFileInputFormat.class);
109-
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
110123
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
124+
boolean diskBasedSortingEnabled = HFileOutputFormat2.diskBasedSortingEnabled(conf);
125+
if (diskBasedSortingEnabled) {
126+
job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
127+
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
128+
} else {
129+
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
130+
}
111131
if (hfileOutPath != null) {
112132
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
113133
TableName tableName = TableName.valueOf(tabName);
114134
job.setMapperClass(HFileCellMapper.class);
115-
job.setReducerClass(CellSortReducer.class);
135+
if (diskBasedSortingEnabled) {
136+
job.setReducerClass(PreSortedCellsReducer.class);
137+
} else {
138+
job.setReducerClass(CellSortReducer.class);
139+
}
116140
Path outputDir = new Path(hfileOutPath);
117141
FileOutputFormat.setOutputPath(job, outputDir);
118142
job.setMapOutputValueClass(MapReduceExtendedCell.class);

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.hadoop.hbase.HRegionLocation;
5151
import org.apache.hadoop.hbase.HTableDescriptor;
5252
import org.apache.hadoop.hbase.KeyValue;
53+
import org.apache.hadoop.hbase.KeyValueUtil;
5354
import org.apache.hadoop.hbase.PrivateCellUtil;
5455
import org.apache.hadoop.hbase.TableName;
5556
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -83,6 +84,7 @@
8384
import org.apache.hadoop.io.NullWritable;
8485
import org.apache.hadoop.io.SequenceFile;
8586
import org.apache.hadoop.io.Text;
87+
import org.apache.hadoop.io.Writable;
8688
import org.apache.hadoop.mapreduce.Job;
8789
import org.apache.hadoop.mapreduce.OutputCommitter;
8890
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -194,6 +196,11 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
194196
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
195197
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
196198

199+
@InterfaceAudience.Private
200+
public static final String DISK_BASED_SORTING_ENABLED_KEY =
201+
"hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled";
202+
private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false;
203+
197204
public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
198205
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
199206
REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
@@ -579,12 +586,19 @@ private static void writePartitions(Configuration conf, Path partitionsPath,
579586

580587
// Write the actual file
581588
FileSystem fs = partitionsPath.getFileSystem(conf);
582-
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath,
583-
ImmutableBytesWritable.class, NullWritable.class);
589+
boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf);
590+
Class<? extends Writable> keyClass =
591+
diskBasedSortingEnabled ? KeyOnlyCellComparable.class : ImmutableBytesWritable.class;
592+
SequenceFile.Writer writer =
593+
SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, NullWritable.class);
584594

585595
try {
586596
for (ImmutableBytesWritable startKey : sorted) {
587-
writer.append(startKey, NullWritable.get());
597+
Writable writable = diskBasedSortingEnabled
598+
? new KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get()))
599+
: startKey;
600+
601+
writer.append(writable, NullWritable.get());
588602
}
589603
} finally {
590604
writer.close();
@@ -631,6 +645,10 @@ public static void configureIncrementalLoad(Job job, TableDescriptor tableDescri
631645
configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
632646
}
633647

648+
public static boolean diskBasedSortingEnabled(Configuration conf) {
649+
return conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, DISK_BASED_SORTING_ENABLED_DEFAULT);
650+
}
651+
634652
static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
635653
Class<? extends OutputFormat<?, ?>> cls) throws IOException {
636654
Configuration conf = job.getConfiguration();
@@ -652,7 +670,13 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
652670
// Based on the configured map output class, set the correct reducer to properly
653671
// sort the incoming values.
654672
// TODO it would be nice to pick one or the other of these formats.
655-
if (
673+
boolean diskBasedSorting = diskBasedSortingEnabled(conf);
674+
675+
if (diskBasedSorting) {
676+
job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
677+
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
678+
job.setReducerClass(PreSortedCellsReducer.class);
679+
} else if (
656680
KeyValue.class.equals(job.getMapOutputValueClass())
657681
|| MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
658682
) {

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ public CellWritableComparable(Cell kv) {
200200
this.kv = kv;
201201
}
202202

203+
public Cell getCell() {
204+
return kv;
205+
}
206+
203207
@Override
204208
public void write(DataOutput out) throws IOException {
205209
int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv);
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.mapreduce;
19+
20+
import java.io.ByteArrayInputStream;
21+
import java.io.DataInput;
22+
import java.io.DataInputStream;
23+
import java.io.DataOutput;
24+
import java.io.IOException;
25+
import org.apache.hadoop.hbase.CellComparator;
26+
import org.apache.hadoop.hbase.ExtendedCell;
27+
import org.apache.hadoop.hbase.KeyValue;
28+
import org.apache.hadoop.hbase.PrivateCellUtil;
29+
import org.apache.hadoop.io.WritableComparable;
30+
import org.apache.hadoop.io.WritableComparator;
31+
import org.apache.yetus.audience.InterfaceAudience;
32+
33+
@InterfaceAudience.Private
34+
public class KeyOnlyCellComparable implements WritableComparable<KeyOnlyCellComparable> {
35+
36+
static {
37+
WritableComparator.define(KeyOnlyCellComparable.class, new KeyOnlyCellComparator());
38+
}
39+
40+
private ExtendedCell cell = null;
41+
42+
public KeyOnlyCellComparable() {
43+
}
44+
45+
public KeyOnlyCellComparable(ExtendedCell cell) {
46+
this.cell = cell;
47+
}
48+
49+
public ExtendedCell getCell() {
50+
return cell;
51+
}
52+
53+
@Override
54+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS",
55+
justification = "This is wrong, yes, but we should be purging Writables, not fixing them")
56+
public int compareTo(KeyOnlyCellComparable o) {
57+
return CellComparator.getInstance().compare(cell, o.cell);
58+
}
59+
60+
@Override
61+
public void write(DataOutput out) throws IOException {
62+
int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
63+
int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value.
64+
out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
65+
out.writeInt(keyLen);
66+
out.writeInt(valueLen);
67+
PrivateCellUtil.writeFlatKey(cell, out);
68+
out.writeLong(cell.getSequenceId());
69+
}
70+
71+
@Override
72+
public void readFields(DataInput in) throws IOException {
73+
cell = KeyValue.create(in);
74+
long seqId = in.readLong();
75+
cell.setSequenceId(seqId);
76+
}
77+
78+
public static class KeyOnlyCellComparator extends WritableComparator {
79+
80+
@Override
81+
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
82+
try (DataInputStream d1 = new DataInputStream(new ByteArrayInputStream(b1, s1, l1));
83+
DataInputStream d2 = new DataInputStream(new ByteArrayInputStream(b2, s2, l2))) {
84+
KeyOnlyCellComparable kv1 = new KeyOnlyCellComparable();
85+
kv1.readFields(d1);
86+
KeyOnlyCellComparable kv2 = new KeyOnlyCellComparable();
87+
kv2.readFields(d2);
88+
return compare(kv1, kv2);
89+
} catch (IOException e) {
90+
throw new RuntimeException(e);
91+
}
92+
}
93+
}
94+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.mapreduce;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.hbase.Cell;
22+
import org.apache.hadoop.hbase.CellUtil;
23+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
24+
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
25+
import org.apache.hadoop.mapreduce.Reducer;
26+
import org.apache.yetus.audience.InterfaceAudience;
27+
28+
@InterfaceAudience.Private
29+
public class PreSortedCellsReducer
30+
extends Reducer<KeyOnlyCellComparable, Cell, ImmutableBytesWritable, Cell> {
31+
32+
@Override
33+
protected void reduce(KeyOnlyCellComparable key, Iterable<Cell> values, Context context)
34+
throws IOException, InterruptedException {
35+
36+
int index = 0;
37+
for (Cell cell : values) {
38+
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(key.getCell())),
39+
new MapReduceExtendedCell(cell));
40+
41+
if (++index % 100 == 0) {
42+
context.setStatus("Wrote " + index + " cells");
43+
}
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)