Skip to content

Commit

Permalink
Notebook to demonstrate write support (databricks#133)
Browse files Browse the repository at this point in the history
* Add files via upload

* Lint

* Update requirements.txt
  • Loading branch information
Fokko authored Feb 28, 2024
1 parent c7577fd commit 8a61fef
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 1 deletion.
251 changes: 251 additions & 0 deletions spark/notebooks/PyIceberg - Write support.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "1041ae6f",
"metadata": {},
"source": [
"![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)"
]
},
{
"cell_type": "markdown",
"id": "247fb2ab",
"metadata": {},
"source": [
"### [Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!](https://tabular.io/blog/docker-spark-and-iceberg/)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6a5c8206",
"metadata": {},
"outputs": [],
"source": [
"from pyiceberg import __version__\n",
"\n",
"__version__"
]
},
{
"cell_type": "markdown",
"id": "6f9a9f41",
"metadata": {},
"source": [
"# Write support\n",
"\n",
"This notebook demonstrates writing to Iceberg tables using PyIceberg. First, connect to the [catalog](https://iceberg.apache.org/concepts/catalog/#iceberg-catalogs), the place where tables are being tracked."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "47645b52",
"metadata": {},
"outputs": [],
"source": [
"from pyiceberg.catalog import load_catalog\n",
"\n",
"catalog = load_catalog('default')"
]
},
{
"cell_type": "markdown",
"id": "c531bd4b-9943-4516-9a6a-99fab016ed2b",
"metadata": {},
"source": [
"# Loading data using Arrow\n",
"\n",
"PyArrow is used to load a Parquet file into memory, and using PyIceberg this data can be written to an Iceberg table."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9fddb808",
"metadata": {},
"outputs": [],
"source": [
"import pyarrow.parquet as pq\n",
"\n",
"df = pq.read_table(\"/home/iceberg/data/yellow_tripdata_2022-01.parquet\")\n",
"\n",
"df"
]
},
{
"cell_type": "markdown",
"id": "bf1d58ad-5cc1-4e8c-9d7b-a54e67def783",
"metadata": {},
"source": [
"# Create an Iceberg table\n",
"\n",
"Next create the Iceberg table directly from the `pyarrow.Table`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "47e5a21d-de87-4aaf-aa06-dc5048acba58",
"metadata": {},
"outputs": [],
"source": [
"table_name = \"default.taxi_dataset\"\n",
"\n",
"try:\n",
" # In case the table already exists\n",
" catalog.drop_table(table_name)\n",
"except:\n",
" pass\n",
"\n",
"table = catalog.create_table(table_name, schema=df.schema)\n",
"\n",
"table"
]
},
{
"cell_type": "markdown",
"id": "d612c035-4cf6-47a0-844b-165dfb463bbc",
"metadata": {},
"source": [
"# Write the data\n",
"\n",
"Let's append the data to the table. Appending or overwriting is equivalent since the table is empty. Next we can query the table and see that the data is there."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "efee8252",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"table.append(df) # or table.overwrite(df)\n",
"\n",
"assert len(table.scan().to_arrow()) == len(df)\n",
"\n",
"table.scan().to_arrow()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9ce1cecc-8cb0-4622-b0eb-55880d091556",
"metadata": {},
"outputs": [],
"source": [
"str(table.current_snapshot())"
]
},
{
"cell_type": "markdown",
"id": "c029ea44-8ba6-4c08-a60d-5fffac6c3666",
"metadata": {},
"source": [
"# Append data\n",
"\n",
"Let's append another month of data to the table"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "794de3a0",
"metadata": {},
"outputs": [],
"source": [
"df = pq.read_table(\"/home/iceberg/data/yellow_tripdata_2022-02.parquet\")\n",
"table.append(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f3ac7021",
"metadata": {},
"outputs": [],
"source": [
"str(table.current_snapshot())"
]
},
{
"cell_type": "markdown",
"id": "85862bdc-7476-43f4-a604-5e4dfff065c9",
"metadata": {},
"source": [
"# Feature generation\n",
"\n",
"Consider that we want to train a model to determine which features contribute to the tip amount. `tip_per_mile` is a good target to train the model on. When we try to append the data, we need to evolve the schema first."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "72a9c64d",
"metadata": {},
"outputs": [],
"source": [
"import pyarrow.compute as pc\n",
"\n",
"df = table.scan().to_arrow()\n",
"df = df.append_column(\"tip_per_mile\", pc.divide(df[\"tip_amount\"], df[\"trip_distance\"]))\n",
"\n",
"try:\n",
" table.overwrite(df)\n",
"except ValueError as e:\n",
" print(f\"Error: {e}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9aafd972-30d2-41ec-90e1-d5e17baeaf0b",
"metadata": {},
"outputs": [],
"source": [
"with table.update_schema() as upd:\n",
" upd.union_by_name(df.schema)\n",
"\n",
"print(str(table.schema()))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ea4ee286-1943-4a88-8d96-1e2a9e11faa1",
"metadata": {},
"outputs": [],
"source": [
"table.overwrite(df)\n",
"\n",
"table"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
2 changes: 1 addition & 1 deletion spark/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
jupyter==1.0.0
spylon-kernel==0.4.1
pyiceberg[pyarrow,duckdb,pandas]==0.5.1
pyiceberg[pyarrow,duckdb,pandas]==0.6.0
jupysql==0.10.5
matplotlib==3.8.2
scipy==1.12.0
Expand Down

0 comments on commit 8a61fef

Please sign in to comment.