Storage

PyIceberg


PyIceberg is a Python client for Apache Iceberg that enables programmatic interaction with Iceberg tables. Use it to create, read, update, and delete data in your analytics buckets.

Installation

1
pip install pyiceberg pyarrow

Basic setup

Here's a complete example showing how to connect to your Supabase analytics bucket and perform operations:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pyiceberg.catalog import load_catalogimport pyarrow as paimport datetime# Configuration - Update with your Supabase credentialsPROJECT_REF = "your-project-ref"WAREHOUSE = "your-analytics-bucket-name"SERVICE_KEY = "your-service-key"# S3 credentials from Project Settings > StorageS3_ACCESS_KEY = "your-access-key"S3_SECRET_KEY = "your-secret-key"S3_REGION = "us-east-1"# Construct Supabase endpointsS3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"# Load the Iceberg REST Catalogcatalog = load_catalog( "supabase-analytics", type="rest", warehouse=WAREHOUSE, uri=CATALOG_URI, token=SERVICE_KEY, **{ "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", "s3.endpoint": S3_ENDPOINT, "s3.access-key-id": S3_ACCESS_KEY, "s3.secret-access-key": S3_SECRET_KEY, "s3.region": S3_REGION, "s3.force-virtual-addressing": False, },)print("✓ Successfully connected to Iceberg catalog")

Creating tables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Create a namespace for organizationcatalog.create_namespace_if_not_exists("analytics")# Define the schema for your Iceberg tableschema = pa.schema([ pa.field("event_id", pa.int64()), pa.field("user_id", pa.int64()), pa.field("event_name", pa.string()), pa.field("event_timestamp", pa.timestamp("ms")), pa.field("properties", pa.string()),])# Create the tabletable = catalog.create_table_if_not_exists( ("analytics", "events"), schema=schema)print("✓ Created table: analytics.events")

Writing data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import datetime# Prepare your datacurrent_time = datetime.datetime.now()data = pa.table({ "event_id": [1, 2, 3, 4, 5], "user_id": [101, 102, 101, 103, 102], "event_name": ["login", "view_product", "logout", "purchase", "login"], "event_timestamp": [current_time] * 5, "properties": [ '{"browser":"chrome"}', '{"product_id":"123"}', '{}', '{"amount":99.99}', '{"browser":"firefox"}' ],})# Append data to the tabletable.append(data)print("✓ Appended 5 rows to analytics.events")

Reading data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Scan the entire tablescan_result = table.scan().to_pandas()print(f"Total rows: {len(scan_result)}")print(scan_result.head())# Query with filtersfiltered = table.scan( filter="event_name = 'login'").to_pandas()print(f"Login events: {len(filtered)}")# Select specific columnsselected = table.scan( selected_fields=["user_id", "event_name", "event_timestamp"]).to_pandas()print(selected.head())

Advanced operations

Listing tables and namespaces

1
2
3
4
5
6
7
8
9
10
11
12
# List all namespacesnamespaces = catalog.list_namespaces()print("Namespaces:", namespaces)# List tables in a namespacetables = catalog.list_tables("analytics")print("Tables in analytics:", tables)# Get table metadatatable_metadata = catalog.load_table(("analytics", "events"))print("Schema:", table_metadata.schema())print("Partitions:", table_metadata.partitions())

Handling errors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
try: # Attempt to load a table table = catalog.load_table(("analytics", "nonexistent"))except Exception as e: print(f"Error loading table: {e}")# Check if table exists before creatingnamespace = "analytics"table_name = "events"try: existing_table = catalog.load_table((namespace, table_name)) print(f"Table already exists")except Exception: print(f"Table does not exist, creating...") table = catalog.create_table((namespace, table_name), schema=schema)

Performance tips

  • Batch writes - Insert data in batches rather than row-by-row for better performance
  • Partition strategies - Use partitioning for large tables to improve query performance
  • Schema evolution - PyIceberg supports schema changes without rewriting data
  • Data format - Use Parquet for efficient columnar storage

Complete example: ETL pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from pyiceberg.catalog import load_catalogimport pyarrow as paimport pandas as pd# Setup (see Basic Setup section above)catalog = load_catalog(...)# Step 1: Create analytics namespacecatalog.create_namespace_if_not_exists("warehouse")# Step 2: Define table schemaschema = pa.schema([ pa.field("id", pa.int64()), pa.field("name", pa.string()), pa.field("created_at", pa.timestamp("ms")),])# Step 3: Create tabletable = catalog.create_table_if_not_exists( ("warehouse", "products"), schema=schema)# Step 4: Load data from CSV or databasedf = pd.read_csv("products.csv")data = pa.Table.from_pandas(df)# Step 5: Write to analytics buckettable.append(data)print(f"✓ Loaded {len(data)} products to warehouse.products")# Step 6: Verifyresult = table.scan().to_pandas()print(result.describe())

Next steps