Skip to content

Commit 4db0e0b

Browse files
rmdmattinglysidkhillonhgromerskhillon
authored
HBASE-29631 Fix race condition in IncrementalTableBackupClient when HFiles are archived during backup (#7346) (#7357) (#7359)
Signed-off-by: Ray Mattingly <rmattingly@apache.org> Co-authored-by: Siddharth Khillon <sidkhillon24@gmail.com> Co-authored-by: Hernan Romer <nanug33@gmail.com> Co-authored-by: skhillon <skhillon@hubspot.com>
1 parent 202e5e0 commit 4db0e0b

2 files changed

Lines changed: 282 additions & 3 deletions

File tree

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles,
200200
int numActiveFiles = activeFiles.size();
201201
updateFileLists(activeFiles, archiveFiles);
202202
if (activeFiles.size() < numActiveFiles) {
203+
// We've archived some files, delete bulkloads directory
204+
// and re-try
205+
deleteBulkLoadDirectory();
203206
continue;
204207
}
205208

@@ -242,7 +245,7 @@ private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName tn,
242245
incrementalCopyBulkloadHFiles(tgtFs, tn);
243246
}
244247

245-
private void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
248+
public void updateFileLists(List<String> activeFiles, List<String> archiveFiles)
246249
throws IOException {
247250
List<String> newlyArchived = new ArrayList<>();
248251

@@ -252,9 +255,23 @@ private void updateFileLists(List<String> activeFiles, List<String> archiveFiles
252255
}
253256
}
254257

255-
if (newlyArchived.size() > 0) {
258+
if (!newlyArchived.isEmpty()) {
259+
String rootDir = CommonFSUtils.getRootDir(conf).toString();
260+
256261
activeFiles.removeAll(newlyArchived);
257-
archiveFiles.addAll(newlyArchived);
262+
for (String file : newlyArchived) {
263+
String archivedFile = file.substring(rootDir.length() + 1);
264+
Path archivedFilePath = new Path(HFileArchiveUtil.getArchivePath(conf), archivedFile);
265+
archivedFile = archivedFilePath.toString();
266+
267+
if (!fs.exists(archivedFilePath)) {
268+
throw new IOException(String.format(
269+
"File %s no longer exists, and no archived file %s exists for it", file, archivedFile));
270+
}
271+
272+
LOG.debug("Archived file {} has been updated", archivedFile);
273+
archiveFiles.add(archivedFile);
274+
}
258275
}
259276

260277
LOG.debug(newlyArchived.size() + " files have been archived.");
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.backup;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
23+
import static org.junit.Assert.fail;
24+
25+
import java.io.IOException;
26+
import java.nio.ByteBuffer;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.Map;
30+
import org.apache.hadoop.fs.FileSystem;
31+
import org.apache.hadoop.fs.Path;
32+
import org.apache.hadoop.hbase.HBaseClassTestRule;
33+
import org.apache.hadoop.hbase.TableName;
34+
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
35+
import org.apache.hadoop.hbase.backup.impl.BulkLoad;
36+
import org.apache.hadoop.hbase.backup.util.BackupUtils;
37+
import org.apache.hadoop.hbase.client.Get;
38+
import org.apache.hadoop.hbase.client.Result;
39+
import org.apache.hadoop.hbase.client.Table;
40+
import org.apache.hadoop.hbase.testclassification.LargeTests;
41+
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
42+
import org.apache.hadoop.hbase.util.Bytes;
43+
import org.apache.hadoop.hbase.util.CommonFSUtils;
44+
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
45+
import org.apache.hadoop.hbase.util.HFileTestUtil;
46+
import org.junit.ClassRule;
47+
import org.junit.Test;
48+
import org.junit.experimental.categories.Category;
49+
50+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
51+
52+
/**
53+
* This test checks whether backups properly track & manage bulk files loads.
54+
*/
55+
@Category(LargeTests.class)
56+
public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
57+
58+
@ClassRule
59+
public static final HBaseClassTestRule CLASS_RULE =
60+
HBaseClassTestRule.forClass(TestIncrementalBackupWithBulkLoad.class);
61+
62+
private static final String TEST_NAME = TestIncrementalBackupWithBulkLoad.class.getSimpleName();
63+
private static final int ROWS_IN_BULK_LOAD = 100;
64+
65+
// implement all test cases in 1 test since incremental backup/restore has dependencies
66+
@Test
67+
public void TestIncBackupDeleteTable() throws Exception {
68+
try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
69+
// The test starts with some data, and no bulk loaded rows.
70+
int expectedRowCount = NB_ROWS_IN_BATCH;
71+
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
72+
assertTrue(systemTable.readBulkloadRows(ImmutableList.of(table1)).isEmpty());
73+
74+
// Bulk loads aren't tracked if the table isn't backed up yet
75+
performBulkLoad("bulk1");
76+
expectedRowCount += ROWS_IN_BULK_LOAD;
77+
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
78+
assertEquals(0, systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
79+
80+
// Create a backup, bulk loads are now being tracked
81+
String backup1 = backupTables(BackupType.FULL, ImmutableList.of(table1), BACKUP_ROOT_DIR);
82+
assertTrue(checkSucceeded(backup1));
83+
performBulkLoad("bulk2");
84+
expectedRowCount += ROWS_IN_BULK_LOAD;
85+
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
86+
assertEquals(1, systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
87+
88+
// Truncating or deleting a table clears the tracked bulk loads (and all rows)
89+
TEST_UTIL.truncateTable(table1).close();
90+
expectedRowCount = 0;
91+
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
92+
assertEquals(0, systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
93+
94+
// Creating a full backup clears the bulk loads (since they are captured in the snapshot)
95+
performBulkLoad("bulk3");
96+
expectedRowCount = ROWS_IN_BULK_LOAD;
97+
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
98+
assertEquals(1, systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
99+
String backup2 = backupTables(BackupType.FULL, ImmutableList.of(table1), BACKUP_ROOT_DIR);
100+
assertTrue(checkSucceeded(backup2));
101+
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
102+
assertEquals(0, systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
103+
104+
// Creating an incremental backup clears the bulk loads
105+
performBulkLoad("bulk4");
106+
performBulkLoad("bulk5");
107+
performBulkLoad("bulk6");
108+
expectedRowCount += 3 * ROWS_IN_BULK_LOAD;
109+
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
110+
assertEquals(3, systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
111+
String backup3 =
112+
backupTables(BackupType.INCREMENTAL, ImmutableList.of(table1), BACKUP_ROOT_DIR);
113+
assertTrue(checkSucceeded(backup3));
114+
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
115+
assertEquals(0, systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
116+
int rowCountAfterBackup3 = expectedRowCount;
117+
118+
// Doing another bulk load, to check that this data will disappear after a restore operation
119+
performBulkLoad("bulk7");
120+
expectedRowCount += ROWS_IN_BULK_LOAD;
121+
assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
122+
List<BulkLoad> bulkloadsTemp = systemTable.readBulkloadRows(ImmutableList.of(table1));
123+
assertEquals(1, bulkloadsTemp.size());
124+
BulkLoad bulk7 = bulkloadsTemp.get(0);
125+
126+
// Doing a restore. Overwriting the table implies clearing the bulk loads,
127+
// but the loading of restored data involves loading bulk data, we expect 2 bulk loads
128+
// associated with backup 3 (loading of full backup, loading of incremental backup).
129+
BackupAdmin client = getBackupAdmin();
130+
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup3, false,
131+
new TableName[] { table1 }, new TableName[] { table1 }, true));
132+
assertEquals(rowCountAfterBackup3, TEST_UTIL.countRows(table1));
133+
List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(ImmutableList.of(table1));
134+
assertEquals(2, bulkLoads.size());
135+
assertFalse(bulkLoads.contains(bulk7));
136+
137+
// Check that we have data of all expected bulk loads
138+
try (Table restoredTable = TEST_UTIL.getConnection().getTable(table1)) {
139+
assertFalse(containsRowWithKey(restoredTable, "bulk1"));
140+
assertFalse(containsRowWithKey(restoredTable, "bulk2"));
141+
assertTrue(containsRowWithKey(restoredTable, "bulk3"));
142+
assertTrue(containsRowWithKey(restoredTable, "bulk4"));
143+
assertTrue(containsRowWithKey(restoredTable, "bulk5"));
144+
assertTrue(containsRowWithKey(restoredTable, "bulk6"));
145+
assertFalse(containsRowWithKey(restoredTable, "bulk7"));
146+
}
147+
}
148+
}
149+
150+
private boolean containsRowWithKey(Table table, String rowKey) throws IOException {
151+
byte[] data = Bytes.toBytes(rowKey);
152+
Get get = new Get(data);
153+
Result result = table.get(get);
154+
return result.containsColumn(famName, qualName);
155+
}
156+
157+
@Test
158+
public void testUpdateFileListsRaceCondition() throws Exception {
159+
try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
160+
// Test the race condition where files are archived during incremental backup
161+
FileSystem fs = TEST_UTIL.getTestFileSystem();
162+
163+
String regionName = "region1";
164+
String columnFamily = "cf";
165+
String filename1 = "hfile1";
166+
String filename2 = "hfile2";
167+
168+
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
169+
Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
170+
Path activeFile1 =
171+
new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename1);
172+
Path activeFile2 =
173+
new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename2);
174+
175+
fs.mkdirs(activeFile1.getParent());
176+
fs.create(activeFile1).close();
177+
fs.create(activeFile2).close();
178+
179+
List<String> activeFiles = new ArrayList<>();
180+
activeFiles.add(activeFile1.toString());
181+
activeFiles.add(activeFile2.toString());
182+
List<String> archiveFiles = new ArrayList<>();
183+
184+
Path archiveDir = HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), table1,
185+
regionName, columnFamily);
186+
Path archivedFile1 = new Path(archiveDir, filename1);
187+
fs.mkdirs(archiveDir);
188+
assertTrue("File should be moved to archive", fs.rename(activeFile1, archivedFile1));
189+
190+
TestBackupBase.IncrementalTableBackupClientForTest client =
191+
new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
192+
"test_backup_id",
193+
createBackupRequest(BackupType.INCREMENTAL, ImmutableList.of(table1), BACKUP_ROOT_DIR));
194+
195+
client.updateFileLists(activeFiles, archiveFiles);
196+
197+
assertEquals("Only one file should remain in active files", 1, activeFiles.size());
198+
assertEquals("File2 should still be in active files", activeFile2.toString(),
199+
activeFiles.get(0));
200+
assertEquals("One file should be added to archive files", 1, archiveFiles.size());
201+
assertEquals("Archived file should have correct path", archivedFile1.toString(),
202+
archiveFiles.get(0));
203+
systemTable.finishBackupExclusiveOperation();
204+
}
205+
206+
}
207+
208+
@Test
209+
public void testUpdateFileListsMissingArchivedFile() throws Exception {
210+
try (BackupSystemTable systemTable = new BackupSystemTable(TEST_UTIL.getConnection())) {
211+
// Test that IOException is thrown when file doesn't exist in archive location
212+
FileSystem fs = TEST_UTIL.getTestFileSystem();
213+
214+
String regionName = "region2";
215+
String columnFamily = "cf";
216+
String filename = "missing_file";
217+
218+
Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
219+
Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
220+
Path activeFile =
221+
new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + Path.SEPARATOR + filename);
222+
223+
fs.mkdirs(activeFile.getParent());
224+
fs.create(activeFile).close();
225+
226+
List<String> activeFiles = new ArrayList<>();
227+
activeFiles.add(activeFile.toString());
228+
List<String> archiveFiles = new ArrayList<>();
229+
230+
// Delete the file but don't create it in archive location
231+
fs.delete(activeFile, false);
232+
233+
TestBackupBase.IncrementalTableBackupClientForTest client =
234+
new TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
235+
"test_backup_id",
236+
createBackupRequest(BackupType.INCREMENTAL, ImmutableList.of(table1), BACKUP_ROOT_DIR));
237+
238+
// This should throw IOException since file doesn't exist in archive
239+
try {
240+
client.updateFileLists(activeFiles, archiveFiles);
241+
fail("Expected IOException to be thrown");
242+
} catch (IOException e) {
243+
// Expected
244+
}
245+
systemTable.finishBackupExclusiveOperation();
246+
}
247+
}
248+
249+
private void performBulkLoad(String keyPrefix) throws IOException {
250+
FileSystem fs = TEST_UTIL.getTestFileSystem();
251+
Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(TEST_NAME);
252+
Path hfilePath =
253+
new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR + "hfile_" + keyPrefix);
254+
255+
HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath, famName, qualName,
256+
Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"), ROWS_IN_BULK_LOAD);
257+
258+
Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
259+
BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, baseDirectory);
260+
assertFalse(result.isEmpty());
261+
}
262+
}

0 commit comments

Comments
 (0)