PyIceberg
This feature is in alpha
Expect rapid changes, limited features, and possible breaking updates. Share feedback as we refine the experience and expand access.
PyIceberg is a Python client for Apache Iceberg that enables programmatic interaction with Iceberg tables. Use it to create, read, update, and delete data in your analytics buckets.
Installation#
1pip install pyiceberg pyarrowBasic setup#
Here's a complete example showing how to connect to your Supabase analytics bucket and perform operations:
1from pyiceberg.catalog import load_catalog2import pyarrow as pa3import datetime45# Configuration - Update with your Supabase credentials6PROJECT_REF = "your-project-ref"7WAREHOUSE = "your-analytics-bucket-name"8SERVICE_KEY = "your-service-key"910# S3 credentials from Project Settings > Storage11S3_ACCESS_KEY = "your-access-key"12S3_SECRET_KEY = "your-secret-key"13S3_REGION = "us-east-1"1415# Construct Supabase endpoints16S3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"17CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"1819# Load the Iceberg REST Catalog20catalog = load_catalog(21 "supabase-analytics",22 type="rest",23 warehouse=WAREHOUSE,24 uri=CATALOG_URI,25 token=SERVICE_KEY,26 **{27 "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",28 "s3.endpoint": S3_ENDPOINT,29 "s3.access-key-id": S3_ACCESS_KEY,30 "s3.secret-access-key": S3_SECRET_KEY,31 "s3.region": S3_REGION,32 "s3.force-virtual-addressing": False,33 },34)3536print("✓ Successfully connected to Iceberg catalog")Creating tables#
1# Create a namespace for organization2catalog.create_namespace_if_not_exists("analytics")34# Define the schema for your Iceberg table5schema = pa.schema([6 pa.field("event_id", pa.int64()),7 pa.field("user_id", pa.int64()),8 pa.field("event_name", pa.string()),9 pa.field("event_timestamp", pa.timestamp("ms")),10 pa.field("properties", pa.string()),11])1213# Create the table14table = catalog.create_table_if_not_exists(15 ("analytics", "events"),16 schema=schema17)1819print("✓ Created table: analytics.events")Writing data#
1import datetime23# Prepare your data4current_time = datetime.datetime.now()5data = pa.table({6 "event_id": [1, 2, 3, 4, 5],7 "user_id": [101, 102, 101, 103, 102],8 "event_name": ["login", "view_product", "logout", "purchase", "login"],9 "event_timestamp": [current_time] * 5,10 "properties": [11 '{"browser":"chrome"}',12 '{"product_id":"123"}',13 '{}',14 '{"amount":99.99}',15 '{"browser":"firefox"}'16 ],17})1819# Append data to the table20table.append(data)21print("✓ Appended 5 rows to analytics.events")Reading data#
1# Scan the entire table2scan_result = table.scan().to_pandas()3print(f"Total rows: {len(scan_result)}")4print(scan_result.head())56# Query with filters7filtered = table.scan(8 filter="event_name = 'login'"9).to_pandas()10print(f"Login events: {len(filtered)}")1112# Select specific columns13selected = table.scan(14 selected_fields=["user_id", "event_name", "event_timestamp"]15).to_pandas()16print(selected.head())Advanced operations#
Listing tables and namespaces#
1# List all namespaces2namespaces = catalog.list_namespaces()3print("Namespaces:", namespaces)45# List tables in a namespace6tables = catalog.list_tables("analytics")7print("Tables in analytics:", tables)89# Get table metadata10table_metadata = catalog.load_table(("analytics", "events"))11print("Schema:", table_metadata.schema())12print("Partitions:", table_metadata.partitions())Handling errors#
1try:2 # Attempt to load a table3 table = catalog.load_table(("analytics", "nonexistent"))4except Exception as e:5 print(f"Error loading table: {e}")67# Check if table exists before creating8namespace = "analytics"9table_name = "events"1011try:12 existing_table = catalog.load_table((namespace, table_name))13 print(f"Table already exists")14except Exception:15 print(f"Table does not exist, creating...")16 table = catalog.create_table((namespace, table_name), schema=schema)Performance tips#
- Batch writes - Insert data in batches rather than row-by-row for better performance
- Partition strategies - Use partitioning for large tables to improve query performance
- Schema evolution - PyIceberg supports schema changes without rewriting data
- Data format - Use Parquet for efficient columnar storage
Complete example: ETL pipeline#
1from pyiceberg.catalog import load_catalog2import pyarrow as pa3import pandas as pd45# Setup (see Basic Setup section above)6catalog = load_catalog(...)78# Step 1: Create analytics namespace9catalog.create_namespace_if_not_exists("warehouse")1011# Step 2: Define table schema12schema = pa.schema([13 pa.field("id", pa.int64()),14 pa.field("name", pa.string()),15 pa.field("created_at", pa.timestamp("ms")),16])1718# Step 3: Create table19table = catalog.create_table_if_not_exists(20 ("warehouse", "products"),21 schema=schema22)2324# Step 4: Load data from CSV or database25df = pd.read_csv("products.csv")26data = pa.Table.from_pandas(df)2728# Step 5: Write to analytics bucket29table.append(data)30print(f"✓ Loaded {len(data)} products to warehouse.products")3132# Step 6: Verify33result = table.scan().to_pandas()34print(result.describe())