PyIceberg
This feature is in alpha
Expect rapid changes, limited features, and possible breaking updates. Share feedback as we refine the experience and expand access.
PyIceberg is a Python client for Apache Iceberg that enables programmatic interaction with Iceberg tables. Use it to create, read, update, and delete data in your analytics buckets.
Installation
1pip install pyiceberg pyarrowBasic setup
Here's a complete example showing how to connect to your Supabase analytics bucket and perform operations:
123456789101112131415161718192021222324252627282930313233343536from pyiceberg.catalog import load_catalogimport pyarrow as paimport datetime# Configuration - Update with your Supabase credentialsPROJECT_REF = "your-project-ref"WAREHOUSE = "your-analytics-bucket-name"SERVICE_KEY = "your-service-key"# S3 credentials from Project Settings > StorageS3_ACCESS_KEY = "your-access-key"S3_SECRET_KEY = "your-secret-key"S3_REGION = "us-east-1"# Construct Supabase endpointsS3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"# Load the Iceberg REST Catalogcatalog = load_catalog( "supabase-analytics", type="rest", warehouse=WAREHOUSE, uri=CATALOG_URI, token=SERVICE_KEY, **{ "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", "s3.endpoint": S3_ENDPOINT, "s3.access-key-id": S3_ACCESS_KEY, "s3.secret-access-key": S3_SECRET_KEY, "s3.region": S3_REGION, "s3.force-virtual-addressing": False, },)print("✓ Successfully connected to Iceberg catalog")Creating tables
12345678910111213141516171819# Create a namespace for organizationcatalog.create_namespace_if_not_exists("analytics")# Define the schema for your Iceberg tableschema = pa.schema([ pa.field("event_id", pa.int64()), pa.field("user_id", pa.int64()), pa.field("event_name", pa.string()), pa.field("event_timestamp", pa.timestamp("ms")), pa.field("properties", pa.string()),])# Create the tabletable = catalog.create_table_if_not_exists( ("analytics", "events"), schema=schema)print("✓ Created table: analytics.events")Writing data
123456789101112131415161718192021import datetime# Prepare your datacurrent_time = datetime.datetime.now()data = pa.table({ "event_id": [1, 2, 3, 4, 5], "user_id": [101, 102, 101, 103, 102], "event_name": ["login", "view_product", "logout", "purchase", "login"], "event_timestamp": [current_time] * 5, "properties": [ '{"browser":"chrome"}', '{"product_id":"123"}', '{}', '{"amount":99.99}', '{"browser":"firefox"}' ],})# Append data to the tabletable.append(data)print("✓ Appended 5 rows to analytics.events")Reading data
12345678910111213141516# Scan the entire tablescan_result = table.scan().to_pandas()print(f"Total rows: {len(scan_result)}")print(scan_result.head())# Query with filtersfiltered = table.scan( filter="event_name = 'login'").to_pandas()print(f"Login events: {len(filtered)}")# Select specific columnsselected = table.scan( selected_fields=["user_id", "event_name", "event_timestamp"]).to_pandas()print(selected.head())Advanced operations
Listing tables and namespaces
123456789101112# List all namespacesnamespaces = catalog.list_namespaces()print("Namespaces:", namespaces)# List tables in a namespacetables = catalog.list_tables("analytics")print("Tables in analytics:", tables)# Get table metadatatable_metadata = catalog.load_table(("analytics", "events"))print("Schema:", table_metadata.schema())print("Partitions:", table_metadata.partitions())Handling errors
12345678910111213141516try: # Attempt to load a table table = catalog.load_table(("analytics", "nonexistent"))except Exception as e: print(f"Error loading table: {e}")# Check if table exists before creatingnamespace = "analytics"table_name = "events"try: existing_table = catalog.load_table((namespace, table_name)) print(f"Table already exists")except Exception: print(f"Table does not exist, creating...") table = catalog.create_table((namespace, table_name), schema=schema)Performance tips
- Batch writes - Insert data in batches rather than row-by-row for better performance
- Partition strategies - Use partitioning for large tables to improve query performance
- Schema evolution - PyIceberg supports schema changes without rewriting data
- Data format - Use Parquet for efficient columnar storage
Complete example: ETL pipeline
12345678910111213141516171819202122232425262728293031323334from pyiceberg.catalog import load_catalogimport pyarrow as paimport pandas as pd# Setup (see Basic Setup section above)catalog = load_catalog(...)# Step 1: Create analytics namespacecatalog.create_namespace_if_not_exists("warehouse")# Step 2: Define table schemaschema = pa.schema([ pa.field("id", pa.int64()), pa.field("name", pa.string()), pa.field("created_at", pa.timestamp("ms")),])# Step 3: Create tabletable = catalog.create_table_if_not_exists( ("warehouse", "products"), schema=schema)# Step 4: Load data from CSV or databasedf = pd.read_csv("products.csv")data = pa.Table.from_pandas(df)# Step 5: Write to analytics buckettable.append(data)print(f"✓ Loaded {len(data)} products to warehouse.products")# Step 6: Verifyresult = table.scan().to_pandas()print(result.describe())