Data Loading Guide¶
Complete guide for loading data into your graph database.
📖 Overview¶
grai.build provides built-in data loading from common data sources including BigQuery, PostgreSQL, and Snowflake, with automatic APOC optimization for 2-3x faster bulk loading.
While grai.build handles schema management AND data loading for typical use cases, it's designed to integrate with orchestration tools (Airflow, Prefect) for complex workflows. Think of it like dbt: it loads data, but you still need an orchestrator for scheduling and dependencies.
What grai.build Does¶
- Schema: Define entities, relations, and properties in YAML
- Validation: Ensure schema consistency before deployment
- Generation: Create Cypher constraints and indexes
- Data Loading: Extract data from BigQuery, Postgres, or Snowflake and load into Neo4j
- APOC Optimization: Automatically use APOC procedures for 2-3x faster bulk loading
- Documentation: Auto-generate visualizations and lineage
Supported Data Warehouses¶
- ✅ BigQuery - Google Cloud data warehouse (OAuth or Service Account)
- ✅ PostgreSQL - Open-source relational database (including Amazon RDS, Google Cloud SQL)
- ✅ Snowflake - Cloud data warehouse (Password or SSO authentication)
What grai.build Does NOT Do (in Production)¶
- Workflow Orchestration: Schedule and orchestrate data loading (use Airflow, Prefect)
- Real-time Sync: Stream changes to your graph (use Kafka, CDC, application code)
- Data Transformations: Complex SQL transformations (use dbt for transformations, then load to graph)
CSV Loading is for Development Only¶
The --load-csv
feature exists only for:
- ✅ Quick local testing
- ✅ Demos and tutorials
- ✅ Validating schema with sample data
In production, use the warehouse loaders (grai load
) for efficient, reliable data loading.
🏗️ Schema-Only Mode (Default)¶
What it does¶
Creates only the database schema:
- Unique constraints on entity keys
- Indexes on entity properties
- No actual data nodes or relationships
When to use¶
- Getting started with a new project
- Setting up a new database
- Testing schema definitions
- CI/CD pipelines (schema validation)
How to use¶
# Schema only (default)
grai run --uri bolt://localhost:7687 --user neo4j --password secret
# Explicit flag
grai run --schema-only --uri bolt://localhost:7687 --user neo4j --password secret
What gets created¶
// Constraints for unique keys
CREATE CONSTRAINT constraint_customer_customer_id IF NOT EXISTS
FOR (n:customer) REQUIRE n.customer_id IS UNIQUE;
// Indexes for properties
CREATE INDEX index_customer_name IF NOT EXISTS
FOR (n:customer) ON (n.name);
CREATE INDEX index_customer_email IF NOT EXISTS
FOR (n:customer) ON (n.email);
📦 With Data Mode¶
What it does¶
Generates MERGE statements with row.property
placeholders designed for use with LOAD CSV
or parameterized queries.
When to use¶
- You have CSV files prepared
- You're using custom data loading scripts
- You need to generate templates for ETL pipelines
How to use¶
# Generate data loading statements
grai run --with-data --uri bolt://localhost:7687 --user neo4j --password secret
⚠️ Important Note¶
The --with-data
flag generates Cypher like this:
This will fail if executed directly because row
is undefined. You need to either:
- Wrap it in a
LOAD CSV
statement - Use it as a template for parameterized queries
- Use Python/application code to supply parameters
🎁 Quick Start with Sample Data¶
When you run grai init
, sample CSV files and a loading script are automatically created:
your-project/
├── data/
│ ├── customers.csv # 5 sample customers
│ ├── products.csv # 6 sample products
│ └── purchased.csv # 10 sample orders
└── load_data.cypher # Ready-to-use Cypher script
To load the sample data immediately:
Option 1: One Command (Recommended)¶
This will:
- Build and validate your project
- Create the schema (constraints & indexes)
- Automatically load CSV data from
load_data.cypher
Option 2: Manual (Neo4j Browser)¶
- Create the schema:
- Load the CSV data:
- Open Neo4j Browser (http://localhost:7474)
- Copy and paste the contents of
load_data.cypher
- Run the script
Option 3: Manual (cypher-shell)¶
# Create schema
grai run --password yourpassword
# Load data
cat load_data.cypher | cypher-shell -u neo4j -p yourpassword
That's it! Your graph is now populated with sample data.
� Loading Data from Warehouses¶
The primary way to load data in production is using the grai load
command, which automatically detects your warehouse type from your profile and loads data efficiently.
Quick Start¶
# Load an entity from your configured warehouse
grai load customer
# Load a relation
grai load PURCHASED
# Load with progress tracking
grai load customer --verbose
# Test with a small sample first
grai load customer --limit 100
How It Works¶
- Profile Detection: Reads
~/.grai/profiles.yml
to determine warehouse type - Auto-Import: Dynamically imports the correct loader (BigQuery, Postgres, or Snowflake)
- APOC Detection: Checks if APOC plugin is available in Neo4j
- Optimized Loading: Uses APOC bulk operations if available (2-3x faster)
- Progress Tracking: Shows real-time progress with batch counts
Warehouse-Specific Examples¶
BigQuery¶
# ~/.grai/profiles.yml
default:
target: dev
outputs:
dev:
warehouse:
type: bigquery
method: oauth
project: my-gcp-project
dataset: analytics
location: US
graph:
type: neo4j
uri: bolt://localhost:7687
user: neo4j
password: mypassword
# Load customer entity from BigQuery
grai load customer
# Load with custom query
grai load customer --query "SELECT * FROM analytics.customers WHERE region = 'US'"
PostgreSQL¶
# ~/.grai/profiles.yml
default:
target: dev
outputs:
dev:
warehouse:
type: postgres
host: localhost
port: 5432
database: analytics
user: postgres
password: "${POSTGRES_PASSWORD}"
schema: public
graph:
type: neo4j
uri: bolt://localhost:7687
user: neo4j
password: mypassword
# Load customer entity from PostgreSQL
grai load customer
# Load with WHERE clause
grai load customer --where "created_at > '2024-01-01'"
Snowflake¶
# ~/.grai/profiles.yml
default:
target: dev
outputs:
dev:
warehouse:
type: snowflake
account: abc12345.us-east-1
user: "${SNOWFLAKE_USER}"
password: "${SNOWFLAKE_PASSWORD}"
role: ANALYST
database: ANALYTICS
warehouse: COMPUTE_WH
schema: PUBLIC
graph:
type: neo4j
uri: bolt://localhost:7687
user: neo4j
password: mypassword
# Load customer entity from Snowflake
grai load customer
# Load with SSO authentication
# (set authenticator: externalbrowser in profile)
grai load customer
APOC Optimization¶
APOC (Awesome Procedures on Cypher) is a Neo4j plugin that provides advanced procedures. When available, grai.build automatically uses apoc.periodic.iterate
for bulk loading, which provides:
- 2-3x faster loading for large datasets
- Automatic batching with configurable batch size
- Better memory management for large imports
- Progress tracking with indeterminate spinner
Check APOC Availability¶
If APOC is not installed, grai.build will automatically fall back to standard Cypher loading.
APOC Control Options¶
# Force APOC usage (fails if not available)
grai load customer --use-apoc force
# Never use APOC (always use standard loading)
grai load customer --use-apoc never
# Auto-detect (default)
grai load customer --use-apoc auto
�🔄 Data Loading Strategies¶
Strategy 1: Python Scripts (Recommended)¶
Use Python to load data directly:
from grai.core.loader.neo4j_loader import connect_neo4j, execute_cypher, close_connection
URI = "bolt://localhost:7687"
USER = "neo4j"
PASSWORD = "graipassword"
DATA = """
CREATE (c1:customer {
customer_id: 'C001',
name: 'Alice Johnson',
email: 'alice@example.com',
created_at: datetime('2024-01-15')
});
CREATE (c2:customer {
customer_id: 'C002',
name: 'Bob Smith',
email: 'bob@example.com',
created_at: datetime('2024-02-01')
});
"""
driver = connect_neo4j(uri=URI, user=USER, password=PASSWORD)
result = execute_cypher(driver, DATA)
if result.success:
print(f"✅ Loaded {result.records_affected} records")
close_connection(driver)
Strategy 2: LOAD CSV¶
Prepare CSV files and use Neo4j's LOAD CSV:
customers.csv:
customer_id,name,email,created_at
C001,Alice Johnson,alice@example.com,2024-01-15T00:00:00Z
C002,Bob Smith,bob@example.com,2024-02-01T00:00:00Z
Load script:
LOAD CSV WITH HEADERS FROM 'file:///data/customers.csv' AS row
MERGE (n:customer {customer_id: row.customer_id})
SET n.name = row.name,
n.email = row.email,
n.created_at = datetime(row.created_at);
Strategy 3: Application Integration¶
Use the neo4j driver in your application:
from neo4j import GraphDatabase
driver = GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password")
)
def create_customer(tx, customer_id, name, email):
query = """
MERGE (c:customer {customer_id: $customer_id})
SET c.name = $name,
c.email = $email
"""
tx.run(query, customer_id=customer_id, name=name, email=email)
with driver.session() as session:
session.write_transaction(
create_customer,
"C001",
"Alice Johnson",
"alice@example.com"
)
driver.close()
🚀 Recommended Workflow¶
1. Create Schema¶
2. Verify Schema¶
3. Load Data¶
Choose your strategy:
- Small datasets: Python scripts (Strategy 1)
- Large datasets: LOAD CSV (Strategy 2)
- Production apps: Application integration (Strategy 3)
4. Verify Data¶
-- Check what was loaded
MATCH (n)
RETURN labels(n) AS type, count(n) AS count;
-- View sample data
MATCH (n:customer)
RETURN n
LIMIT 5;
🔧 Troubleshooting¶
Error: "Variable row
not defined"¶
Cause: Trying to execute data loading Cypher without LOAD CSV context.
Solution: Use --schema-only
(default) instead of --with-data
:
No data appears after grai run
¶
Expected behavior: By default, grai run
only creates the schema, not data.
Solution: Load data using Python scripts (see Strategy 1 above).
CSV loading fails¶
Common issues:
- File path: Make sure CSV is in Neo4j import directory
- Headers: CSV must have headers matching property names
- Encoding: Use UTF-8 encoding
- Line endings: Unix (LF) line endings preferred
📚 Examples¶
Complete Example: Schema + Data¶
# Step 1: Create schema
grai run --uri bolt://localhost:7687 --user neo4j --password secret
# Step 2: Load data
cat > load_data.py << 'EOF'
from grai.core.loader.neo4j_loader import connect_neo4j, execute_cypher, close_connection
driver = connect_neo4j(
uri="bolt://localhost:7687",
user="neo4j",
password="secret"
)
data = """
CREATE (c:customer {customer_id: 'C001', name: 'Alice', email: 'alice@example.com'});
CREATE (p:product {product_id: 'P001', name: 'Laptop', price: 999.99});
MATCH (c:customer {customer_id: 'C001'})
MATCH (p:product {product_id: 'P001'})
CREATE (c)-[:PURCHASED {order_id: 'O001', order_date: date()}]->(p);
"""
result = execute_cypher(driver, data)
print(f"✅ Created {result.records_affected} records")
close_connection(driver)
EOF
python load_data.py
# Step 3: Verify
# Open Neo4j Browser and run:
# MATCH (n)-[r]->(m) RETURN n, r, m LIMIT 25;
🎯 Best Practices¶
- Always create schema first before loading data
- Use warehouse loaders for production (
grai load
command) - Test with small datasets using
--limit
before full loads - Let APOC auto-detect (default) - it will use APOC when available
- Use profiles for managing dev/staging/prod environments
- Use environment variables for sensitive credentials
- Monitor progress with
--verbose
flag for visibility - Add error handling in orchestration (Airflow/Prefect)
- Validate data before loading (check for nulls, duplicates, etc.)
- Set appropriate batch sizes based on your data and Neo4j memory
📊 Performance Comparison¶
Standard Loading vs APOC¶
For a typical dataset of 100,000 customer nodes:
Method | Time | Throughput | Notes |
---|---|---|---|
Standard Cypher | ~45s | 2,222 nodes/sec | Individual MERGE statements |
APOC Bulk Loading | ~18s | 5,555 nodes/sec | Batched with apoc.periodic.iterate |
Speedup | 2.5x faster | 2.5x higher | Automatic when APOC available |
APOC is especially beneficial for:
- Large datasets (>10,000 rows)
- Relations with complex lookups
- Memory-constrained environments
- High-throughput requirements
🔧 Warehouse-Specific Tips¶
BigQuery¶
- Use OAuth for development (
gcloud auth application-default login
) - Use Service Account for production
- Consider table partitioning for large datasets
- Use
--query
flag for complex filtering - Set appropriate
timeout_seconds
for large queries
PostgreSQL¶
- Use connection pooling for high-frequency loads
- Use read replicas to avoid impacting production
- Use
--where
flag for incremental loads - Consider indexes on join columns in Postgres
- Use
sslmode: require
for production
Snowflake¶
- Use dedicated warehouse for ETL workloads
- Use SSO (
authenticator: externalbrowser
) for development - Use role-based access control for security
- Consider clustering keys for large tables
- Use
--where
flag for date-based filtering
� Related Documentation¶
- Profiles - Configure warehouse and graph connections
- CLI Reference - Complete
grai load
command reference - Getting Started Guide - Complete beginner tutorial
- Neo4j Setup Guide - Local Neo4j installation
- Sources Configuration - Define data sources in YAML
Questions? Issues?
File an issue on GitHub: https://github.com/asantora05/grai.build/issues