Skip to content

Commit 7c21e71

Browse files
HBASE-25839 Bulk Import fails with java.io.IOException: Type mismatch in value from map (#6602)
Signed-off-by: Pankaj Kumar<[email protected]>
1 parent 6c5786f commit 7c21e71

File tree

2 files changed

+48
-3
lines changed

2 files changed

+48
-3
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,11 @@ public CellWritableComparable(Cell kv) {
132132

133133
@Override
134134
public void write(DataOutput out) throws IOException {
135-
out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv));
136-
out.writeInt(0);
135+
int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv);
136+
int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value.
137+
out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
138+
out.writeInt(keyLen);
139+
out.writeInt(valueLen);
137140
PrivateCellUtil.writeFlatKey(kv, out);
138141
}
139142

@@ -212,7 +215,8 @@ public void map(ImmutableBytesWritable row, Result value, Context context) throw
212215
continue;
213216
}
214217
Cell ret = convertKv(kv, cfRenameMap);
215-
context.write(new CellWritableComparable(ret), ret);
218+
context.write(new CellWritableComparable(ret),
219+
new MapReduceExtendedCell((ExtendedCell) ret));
216220
}
217221
}
218222
} catch (InterruptedException e) {

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java

+41
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,47 @@ public void testWithFilter() throws Throwable {
507507
importTable.close();
508508
}
509509

510+
/**
511+
* Create a simple table, run an Export Job on it, Import with bulk output and enable largeResult
512+
*/
513+
@Test
514+
public void testBulkImportAndLargeResult() throws Throwable {
515+
// Create simple table to export
516+
TableDescriptor desc = TableDescriptorBuilder
517+
.newBuilder(TableName.valueOf(name.getMethodName()))
518+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
519+
.build();
520+
UTIL.getAdmin().createTable(desc);
521+
Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
522+
523+
Put p1 = new Put(ROW1);
524+
p1.addColumn(FAMILYA, QUAL, now, QUAL);
525+
526+
// Having another row would actually test the filter.
527+
Put p2 = new Put(ROW2);
528+
p2.addColumn(FAMILYA, QUAL, now, QUAL);
529+
530+
exportTable.put(Arrays.asList(p1, p2));
531+
532+
// Export the simple table
533+
String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
534+
assertTrue(runExport(args));
535+
536+
// Import to a new table
537+
final String IMPORT_TABLE = name.getMethodName() + "import";
538+
desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE))
539+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
540+
.build();
541+
UTIL.getAdmin().createTable(desc);
542+
543+
String O_OUTPUT_DIR =
544+
new Path(OUTPUT_DIR + 1).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
545+
546+
args = new String[] { "-D" + Import.BULK_OUTPUT_CONF_KEY + "=" + O_OUTPUT_DIR,
547+
"-D" + Import.HAS_LARGE_RESULT + "=" + true, IMPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
548+
assertTrue(runImport(args));
549+
}
550+
510551
/**
511552
* Count the number of keyvalues in the specified table with the given filter
512553
* @param table the table to scan

0 commit comments

Comments
 (0)