Apache Spark
This feature is in alpha
Expect rapid changes, limited features, and possible breaking updates. Share feedback as we refine the experience and expand access.
Apache Spark enables distributed analytical processing of large datasets stored in your analytics buckets. Use it for complex transformations, aggregations, and machine learning workflows.
Installation
First, ensure you have Spark installed. For Python-based workflows:
1pip install pysparkFor detailed Spark setup instructions, see the Apache Spark documentation.
Basic setup
Here's a complete example showing how to configure Spark with your Supabase analytics bucket:
1234567891011121314151617181920212223242526272829303132333435363738394041from pyspark.sql import SparkSession# Configuration - Update with your Supabase credentialsPROJECT_REF = "your-project-ref"WAREHOUSE = "your-analytics-bucket-name"SERVICE_KEY = "your-service-key"# S3 credentials from Project Settings > StorageS3_ACCESS_KEY = "your-access-key"S3_SECRET_KEY = "your-secret-key"S3_REGION = "us-east-1"# Construct Supabase endpointsS3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"# Initialize Spark session with Iceberg configurationspark = SparkSession.builder \ .master("local[*]") \ .appName("SupabaseIceberg") \ .config("spark.driver.host", "127.0.0.1") \ .config("spark.driver.bindAddress", "127.0.0.1") \ .config( 'spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1' ) \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.supabase", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.supabase.type", "rest") \ .config("spark.sql.catalog.supabase.uri", CATALOG_URI) \ .config("spark.sql.catalog.supabase.warehouse", WAREHOUSE) \ .config("spark.sql.catalog.supabase.token", SERVICE_KEY) \ .config("spark.sql.catalog.supabase.s3.endpoint", S3_ENDPOINT) \ .config("spark.sql.catalog.supabase.s3.path-style-access", "true") \ .config("spark.sql.catalog.supabase.s3.access-key-id", S3_ACCESS_KEY) \ .config("spark.sql.catalog.supabase.s3.secret-access-key", S3_SECRET_KEY) \ .config("spark.sql.catalog.supabase.s3.remote-signing-enabled", "false") \ .config("spark.sql.defaultCatalog", "supabase") \ .getOrCreate()print("✓ Spark session initialized with Iceberg")Creating tables
12345678910111213141516# Create a namespace for organizationspark.sql("CREATE NAMESPACE IF NOT EXISTS analytics")# Create a new Iceberg tablespark.sql(""" CREATE TABLE IF NOT EXISTS analytics.events ( event_id BIGINT, user_id BIGINT, event_name STRING, event_timestamp TIMESTAMP, properties STRING ) USING iceberg""")print("✓ Created table: analytics.events")Writing data
1234567891011# Insert data into the tablespark.sql(""" INSERT INTO analytics.events (event_id, user_id, event_name, event_timestamp, properties) VALUES (1, 101, 'login', TIMESTAMP '2024-01-15 10:30:00', '{"browser":"chrome"}'), (2, 102, 'view_product', TIMESTAMP '2024-01-15 10:35:00', '{"product_id":"123"}'), (3, 101, 'logout', TIMESTAMP '2024-01-15 10:40:00', '{}'), (4, 103, 'purchase', TIMESTAMP '2024-01-15 10:45:00', '{"amount":99.99}')""")print("✓ Inserted 4 rows into analytics.events")Reading data
1234567891011121314151617181920212223# Read the entire tableresult_df = spark.sql("SELECT * FROM analytics.events")result_df.show(truncate=False)# Apply filtersfiltered_df = spark.sql(""" SELECT event_id, user_id, event_name FROM analytics.events WHERE event_name = 'login'""")filtered_df.show()# Aggregationssummary_df = spark.sql(""" SELECT event_name, COUNT(*) as event_count, COUNT(DISTINCT user_id) as unique_users FROM analytics.events GROUP BY event_name ORDER BY event_count DESC""")summary_df.show()Advanced operations
Working with dataframes
123456789101112131415# Read as DataFrameevents_df = spark.read.format("iceberg").load("analytics.events")# Apply Spark transformationsfrom pyspark.sql.functions import count, col, year, month# Monthly event countsmonthly_events = events_df \ .withColumn("month", month(col("event_timestamp"))) \ .withColumn("year", year(col("event_timestamp"))) \ .groupBy("year", "month", "event_name") \ .agg(count("event_id").alias("count")) \ .orderBy("year", "month")monthly_events.show()Joining tables
12345678910111213141516171819202122232425262728293031# Create another tablespark.sql(""" CREATE TABLE IF NOT EXISTS analytics.users ( user_id BIGINT, username STRING, email STRING ) USING iceberg""")spark.sql(""" INSERT INTO analytics.users VALUES (101, 'alice', 'alice@example.com'), (102, 'bob', 'bob@example.com'), (103, 'charlie', 'charlie@example.com')""")# Join events with usersjoined_df = spark.sql(""" SELECT e.event_id, e.event_name, u.username, u.email, e.event_timestamp FROM analytics.events e JOIN analytics.users u ON e.user_id = u.user_id ORDER BY e.event_timestamp""")joined_df.show(truncate=False)Exporting results
1234567891011121314151617181920# Export to Parquetspark.sql(""" SELECT event_name, COUNT(*) as count FROM analytics.events GROUP BY event_name""").write \ .mode("overwrite") \ .parquet("/tmp/event_summary.parquet")# Export to CSVspark.sql(""" SELECT * FROM analytics.events WHERE event_timestamp > TIMESTAMP '2024-01-15 10:30:00'""").write \ .mode("overwrite") \ .option("header", "true") \ .csv("/tmp/recent_events.csv")print("✓ Results exported")Performance best practices
- Partition tables - Partition large tables by date or region for faster queries
- Select columns - Only select columns you need to reduce I/O
- Use filters early - Apply WHERE clauses to reduce data processed
- Cache frequently accessed tables - Use
spark.catalog.cacheTable()for tables accessed multiple times - Cluster mode - Use cluster mode for production workloads instead of local mode
Complete example: Data processing pipeline
1234567891011121314151617181920212223242526272829from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, year, month, count# Setup (see Basic Setup section above)spark = SparkSession.builder \ .master("local[*]") \ .appName("SupabaseAnalytics") \ .config("spark.sql.defaultCatalog", "supabase") \ # ... (add all config from Basic Setup) .getOrCreate()# Step 1: Read raw eventsraw_events = spark.sql("SELECT * FROM analytics.events")# Step 2: Transform and aggregatemonthly_summary = raw_events \ .withColumn("month", month(col("event_timestamp"))) \ .withColumn("year", year(col("event_timestamp"))) \ .groupBy("year", "month", "event_name") \ .agg(count("event_id").alias("total_events"))# Step 3: Save resultsmonthly_summary.write \ .mode("overwrite") \ .option("path", "analytics.monthly_summary") \ .saveAsTable("analytics.monthly_summary")print("✓ Pipeline completed")monthly_summary.show()