title | url | aliases | menu | |||||||
---|---|---|---|---|---|---|---|---|---|---|
Java Quickstart |
java-api-quickstart |
|
|
Tables are created using either a [Catalog
](../../../javadoc/{{% icebergVersion %}}/index.html?org/apache/iceberg/catalog/Catalog.html) or an implementation of the [Tables
](../../../javadoc/{{% icebergVersion %}}/index.html?org/apache/iceberg/Tables.html) interface.
The Hive catalog connects to a Hive metastore to keep track of Iceberg tables. You can initialize a Hive catalog with a name and some properties. (see: Catalog properties)
Note: Currently, setConf
is always required for hive catalogs, but this will change in the future.
import org.apache.iceberg.hive.HiveCatalog;
HiveCatalog catalog = new HiveCatalog();
catalog.setConf(spark.sparkContext().hadoopConfiguration()); // Configure using Spark's Hadoop configuration
Map <String, String> properties = new HashMap<String, String>();
properties.put("warehouse", "...");
properties.put("uri", "...");
catalog.initialize("hive", properties);
The Catalog
interface defines methods for working with tables, like createTable
, loadTable
, renameTable
, and dropTable
. HiveCatalog
implements the Catalog
interface.
To create a table, pass an Identifier
and a Schema
along with other initial metadata:
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
TableIdentifier name = TableIdentifier.of("logging", "logs");
Table table = catalog.createTable(name, schema, spec);
// or to load an existing table, use the following line
// Table table = catalog.loadTable(name);
The logs schema and partition spec are created below.
A Hadoop catalog doesn't need to connect to a Hive MetaStore, but can only be used with HDFS or similar file systems that support atomic rename. Concurrent writes with a Hadoop catalog are not safe with a local FS or S3. To create a Hadoop catalog:
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopCatalog;
Configuration conf = new Configuration();
String warehousePath = "hdfs://host:8020/warehouse_path";
HadoopCatalog catalog = new HadoopCatalog(conf, warehousePath);
Like the Hive catalog, HadoopCatalog
implements Catalog
, so it also has methods for working with tables, like createTable
, loadTable
, and dropTable
.
This example creates a table with Hadoop catalog:
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
TableIdentifier name = TableIdentifier.of("logging", "logs");
Table table = catalog.createTable(name, schema, spec);
// or to load an existing table, use the following line
// Table table = catalog.loadTable(name);
The logs schema and partition spec are created below.
Iceberg also supports tables that are stored in a directory in HDFS. Concurrent writes with a Hadoop tables are not safe when stored in the local FS or S3. Directory tables don't support all catalog operations, like rename, so they use the Tables
interface instead of Catalog
.
To create a table in HDFS, use HadoopTables
:
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.Table;
Configuration conf = new Configuration();
HadoopTables tables = new HadoopTables(conf);
Table table = tables.create(schema, spec, table_location);
// or to load an existing table, use the following line
// Table table = tables.load(table_location);
{{< hint danger >}} Hadoop tables shouldn't be used with file systems that do not support atomic rename. Iceberg relies on rename to synchronize concurrent commits for directory tables. {{< /hint >}}
Spark uses both HiveCatalog
and HadoopTables
to load tables. Hive is used when the identifier passed to load
or save
is not a path, otherwise Spark assumes it is a path-based table.
To read and write to tables from Spark see:
This example creates a schema for a logs
table:
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
Schema schema = new Schema(
Types.NestedField.required(1, "level", Types.StringType.get()),
Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),
Types.NestedField.required(3, "message", Types.StringType.get()),
Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get()))
);
When using the Iceberg API directly, type IDs are required. Conversions from other schema formats, like Spark, Avro, and Parquet will automatically assign new IDs.
When a table is created, all IDs in the schema are re-assigned to ensure uniqueness.
To create an Iceberg schema from an existing Avro schema, use converters in AvroSchemaUtil
:
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.iceberg.avro.AvroSchemaUtil;
Schema avroSchema = new Parser().parse("{\"type\": \"record\" , ... }");
Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
To create an Iceberg schema from an existing table, use converters in SparkSchemaUtil
:
import org.apache.iceberg.spark.SparkSchemaUtil;
Schema schema = SparkSchemaUtil.schemaForTable(sparkSession, table_name);
Partition specs describe how Iceberg should group records into data files. Partition specs are created for a table's schema using a builder.
This example creates a partition spec for the logs
table that partitions records by the hour of the log event's timestamp and by log level:
import org.apache.iceberg.PartitionSpec;
PartitionSpec spec = PartitionSpec.builderFor(schema)
.hour("event_time")
.identity("level")
.build();
For more information on the different partition transforms that Iceberg offers, visit this page.