Skip to content

Data Loading Guide

Complete guide for loading data into your graph database.


📖 Overview

grai.build provides built-in data loading from common data sources including BigQuery, PostgreSQL, and Snowflake, with automatic APOC optimization for 2-3x faster bulk loading.

While grai.build handles schema management AND data loading for typical use cases, it's designed to integrate with orchestration tools (Airflow, Prefect) for complex workflows. Think of it like dbt: it loads data, but you still need an orchestrator for scheduling and dependencies.

What grai.build Does

  1. Schema: Define entities, relations, and properties in YAML
  2. Validation: Ensure schema consistency before deployment
  3. Generation: Create Cypher constraints and indexes
  4. Data Loading: Extract data from BigQuery, Postgres, or Snowflake and load into Neo4j
  5. APOC Optimization: Automatically use APOC procedures for 2-3x faster bulk loading
  6. Documentation: Auto-generate visualizations and lineage

Supported Data Warehouses

  • BigQuery - Google Cloud data warehouse (OAuth or Service Account)
  • PostgreSQL - Open-source relational database (including Amazon RDS, Google Cloud SQL)
  • Snowflake - Cloud data warehouse (Password or SSO authentication)

What grai.build Does NOT Do (in Production)

  1. Workflow Orchestration: Schedule and orchestrate data loading (use Airflow, Prefect)
  2. Real-time Sync: Stream changes to your graph (use Kafka, CDC, application code)
  3. Data Transformations: Complex SQL transformations (use dbt for transformations, then load to graph)

CSV Loading is for Development Only

The --load-csv feature exists only for:

  • ✅ Quick local testing
  • ✅ Demos and tutorials
  • ✅ Validating schema with sample data

In production, use the warehouse loaders (grai load) for efficient, reliable data loading.


🏗️ Schema-Only Mode (Default)

What it does

Creates only the database schema:

  • Unique constraints on entity keys
  • Indexes on entity properties
  • No actual data nodes or relationships

When to use

  • Getting started with a new project
  • Setting up a new database
  • Testing schema definitions
  • CI/CD pipelines (schema validation)

How to use

# Schema only (default)
grai run --uri bolt://localhost:7687 --user neo4j --password secret

# Explicit flag
grai run --schema-only --uri bolt://localhost:7687 --user neo4j --password secret

What gets created

// Constraints for unique keys
CREATE CONSTRAINT constraint_customer_customer_id IF NOT EXISTS
FOR (n:customer) REQUIRE n.customer_id IS UNIQUE;

// Indexes for properties
CREATE INDEX index_customer_name IF NOT EXISTS
FOR (n:customer) ON (n.name);

CREATE INDEX index_customer_email IF NOT EXISTS
FOR (n:customer) ON (n.email);

📦 With Data Mode

What it does

Generates MERGE statements with row.property placeholders designed for use with LOAD CSV or parameterized queries.

When to use

  • You have CSV files prepared
  • You're using custom data loading scripts
  • You need to generate templates for ETL pipelines

How to use

# Generate data loading statements
grai run --with-data --uri bolt://localhost:7687 --user neo4j --password secret

⚠️ Important Note

The --with-data flag generates Cypher like this:

MERGE (n:customer {customer_id: row.customer_id})
SET n.name = row.name,
    n.email = row.email;

This will fail if executed directly because row is undefined. You need to either:

  1. Wrap it in a LOAD CSV statement
  2. Use it as a template for parameterized queries
  3. Use Python/application code to supply parameters

🎁 Quick Start with Sample Data

When you run grai init, sample CSV files and a loading script are automatically created:

your-project/
├── data/
│   ├── customers.csv      # 5 sample customers
│   ├── products.csv       # 6 sample products
│   └── purchased.csv      # 10 sample orders
└── load_data.cypher       # Ready-to-use Cypher script

To load the sample data immediately:

# Create schema AND load CSV data in one command
grai run --load-csv --password yourpassword

This will:

  1. Build and validate your project
  2. Create the schema (constraints & indexes)
  3. Automatically load CSV data from load_data.cypher

Option 2: Manual (Neo4j Browser)

  1. Create the schema:
grai run --password yourpassword
  1. Load the CSV data:
  2. Open Neo4j Browser (http://localhost:7474)
  3. Copy and paste the contents of load_data.cypher
  4. Run the script

Option 3: Manual (cypher-shell)

# Create schema
grai run --password yourpassword

# Load data
cat load_data.cypher | cypher-shell -u neo4j -p yourpassword

That's it! Your graph is now populated with sample data.


� Loading Data from Warehouses

The primary way to load data in production is using the grai load command, which automatically detects your warehouse type from your profile and loads data efficiently.

Quick Start

# Load an entity from your configured warehouse
grai load customer

# Load a relation
grai load PURCHASED

# Load with progress tracking
grai load customer --verbose

# Test with a small sample first
grai load customer --limit 100

How It Works

  1. Profile Detection: Reads ~/.grai/profiles.yml to determine warehouse type
  2. Auto-Import: Dynamically imports the correct loader (BigQuery, Postgres, or Snowflake)
  3. APOC Detection: Checks if APOC plugin is available in Neo4j
  4. Optimized Loading: Uses APOC bulk operations if available (2-3x faster)
  5. Progress Tracking: Shows real-time progress with batch counts

Warehouse-Specific Examples

BigQuery

# ~/.grai/profiles.yml
default:
  target: dev
  outputs:
    dev:
      warehouse:
        type: bigquery
        method: oauth
        project: my-gcp-project
        dataset: analytics
        location: US

      graph:
        type: neo4j
        uri: bolt://localhost:7687
        user: neo4j
        password: mypassword
# Load customer entity from BigQuery
grai load customer

# Load with custom query
grai load customer --query "SELECT * FROM analytics.customers WHERE region = 'US'"

PostgreSQL

# ~/.grai/profiles.yml
default:
  target: dev
  outputs:
    dev:
      warehouse:
        type: postgres
        host: localhost
        port: 5432
        database: analytics
        user: postgres
        password: "${POSTGRES_PASSWORD}"
        schema: public

      graph:
        type: neo4j
        uri: bolt://localhost:7687
        user: neo4j
        password: mypassword
# Load customer entity from PostgreSQL
grai load customer

# Load with WHERE clause
grai load customer --where "created_at > '2024-01-01'"

Snowflake

# ~/.grai/profiles.yml
default:
  target: dev
  outputs:
    dev:
      warehouse:
        type: snowflake
        account: abc12345.us-east-1
        user: "${SNOWFLAKE_USER}"
        password: "${SNOWFLAKE_PASSWORD}"
        role: ANALYST
        database: ANALYTICS
        warehouse: COMPUTE_WH
        schema: PUBLIC

      graph:
        type: neo4j
        uri: bolt://localhost:7687
        user: neo4j
        password: mypassword
# Load customer entity from Snowflake
grai load customer

# Load with SSO authentication
# (set authenticator: externalbrowser in profile)
grai load customer

APOC Optimization

APOC (Awesome Procedures on Cypher) is a Neo4j plugin that provides advanced procedures. When available, grai.build automatically uses apoc.periodic.iterate for bulk loading, which provides:

  • 2-3x faster loading for large datasets
  • Automatic batching with configurable batch size
  • Better memory management for large imports
  • Progress tracking with indeterminate spinner

Check APOC Availability

-- In Neo4j Browser
RETURN apoc.version();

If APOC is not installed, grai.build will automatically fall back to standard Cypher loading.

APOC Control Options

# Force APOC usage (fails if not available)
grai load customer --use-apoc force

# Never use APOC (always use standard loading)
grai load customer --use-apoc never

# Auto-detect (default)
grai load customer --use-apoc auto

�🔄 Data Loading Strategies

Use Python to load data directly:

from grai.core.loader.neo4j_loader import connect_neo4j, execute_cypher, close_connection

URI = "bolt://localhost:7687"
USER = "neo4j"
PASSWORD = "graipassword"

DATA = """
CREATE (c1:customer {
    customer_id: 'C001',
    name: 'Alice Johnson',
    email: 'alice@example.com',
    created_at: datetime('2024-01-15')
});

CREATE (c2:customer {
    customer_id: 'C002',
    name: 'Bob Smith',
    email: 'bob@example.com',
    created_at: datetime('2024-02-01')
});
"""

driver = connect_neo4j(uri=URI, user=USER, password=PASSWORD)
result = execute_cypher(driver, DATA)

if result.success:
    print(f"✅ Loaded {result.records_affected} records")

close_connection(driver)

Strategy 2: LOAD CSV

Prepare CSV files and use Neo4j's LOAD CSV:

customers.csv:

customer_id,name,email,created_at
C001,Alice Johnson,alice@example.com,2024-01-15T00:00:00Z
C002,Bob Smith,bob@example.com,2024-02-01T00:00:00Z

Load script:

LOAD CSV WITH HEADERS FROM 'file:///data/customers.csv' AS row
MERGE (n:customer {customer_id: row.customer_id})
SET n.name = row.name,
    n.email = row.email,
    n.created_at = datetime(row.created_at);

Strategy 3: Application Integration

Use the neo4j driver in your application:

from neo4j import GraphDatabase

driver = GraphDatabase.driver(
    "bolt://localhost:7687",
    auth=("neo4j", "password")
)

def create_customer(tx, customer_id, name, email):
    query = """
    MERGE (c:customer {customer_id: $customer_id})
    SET c.name = $name,
        c.email = $email
    """
    tx.run(query, customer_id=customer_id, name=name, email=email)

with driver.session() as session:
    session.write_transaction(
        create_customer,
        "C001",
        "Alice Johnson",
        "alice@example.com"
    )

driver.close()

1. Create Schema

# First, create the schema
grai run --uri bolt://localhost:7687 --user neo4j --password secret

2. Verify Schema

-- In Neo4j Browser
SHOW CONSTRAINTS;
SHOW INDEXES;

3. Load Data

Choose your strategy:

  • Small datasets: Python scripts (Strategy 1)
  • Large datasets: LOAD CSV (Strategy 2)
  • Production apps: Application integration (Strategy 3)

4. Verify Data

-- Check what was loaded
MATCH (n)
RETURN labels(n) AS type, count(n) AS count;

-- View sample data
MATCH (n:customer)
RETURN n
LIMIT 5;

🔧 Troubleshooting

Error: "Variable row not defined"

Cause: Trying to execute data loading Cypher without LOAD CSV context.

Solution: Use --schema-only (default) instead of --with-data:

# This works (default)
grai run

# This will fail without CSV files
grai run --with-data

No data appears after grai run

Expected behavior: By default, grai run only creates the schema, not data.

Solution: Load data using Python scripts (see Strategy 1 above).

CSV loading fails

Common issues:

  1. File path: Make sure CSV is in Neo4j import directory
  2. Headers: CSV must have headers matching property names
  3. Encoding: Use UTF-8 encoding
  4. Line endings: Unix (LF) line endings preferred

📚 Examples

Complete Example: Schema + Data

# Step 1: Create schema
grai run --uri bolt://localhost:7687 --user neo4j --password secret

# Step 2: Load data
cat > load_data.py << 'EOF'
from grai.core.loader.neo4j_loader import connect_neo4j, execute_cypher, close_connection

driver = connect_neo4j(
    uri="bolt://localhost:7687",
    user="neo4j",
    password="secret"
)

data = """
CREATE (c:customer {customer_id: 'C001', name: 'Alice', email: 'alice@example.com'});
CREATE (p:product {product_id: 'P001', name: 'Laptop', price: 999.99});
MATCH (c:customer {customer_id: 'C001'})
MATCH (p:product {product_id: 'P001'})
CREATE (c)-[:PURCHASED {order_id: 'O001', order_date: date()}]->(p);
"""

result = execute_cypher(driver, data)
print(f"✅ Created {result.records_affected} records")
close_connection(driver)
EOF

python load_data.py

# Step 3: Verify
# Open Neo4j Browser and run:
# MATCH (n)-[r]->(m) RETURN n, r, m LIMIT 25;

🎯 Best Practices

  1. Always create schema first before loading data
  2. Use warehouse loaders for production (grai load command)
  3. Test with small datasets using --limit before full loads
  4. Let APOC auto-detect (default) - it will use APOC when available
  5. Use profiles for managing dev/staging/prod environments
  6. Use environment variables for sensitive credentials
  7. Monitor progress with --verbose flag for visibility
  8. Add error handling in orchestration (Airflow/Prefect)
  9. Validate data before loading (check for nulls, duplicates, etc.)
  10. Set appropriate batch sizes based on your data and Neo4j memory

📊 Performance Comparison

Standard Loading vs APOC

For a typical dataset of 100,000 customer nodes:

Method Time Throughput Notes
Standard Cypher ~45s 2,222 nodes/sec Individual MERGE statements
APOC Bulk Loading ~18s 5,555 nodes/sec Batched with apoc.periodic.iterate
Speedup 2.5x faster 2.5x higher Automatic when APOC available

APOC is especially beneficial for:

  • Large datasets (>10,000 rows)
  • Relations with complex lookups
  • Memory-constrained environments
  • High-throughput requirements

🔧 Warehouse-Specific Tips

BigQuery

  • Use OAuth for development (gcloud auth application-default login)
  • Use Service Account for production
  • Consider table partitioning for large datasets
  • Use --query flag for complex filtering
  • Set appropriate timeout_seconds for large queries

PostgreSQL

  • Use connection pooling for high-frequency loads
  • Use read replicas to avoid impacting production
  • Use --where flag for incremental loads
  • Consider indexes on join columns in Postgres
  • Use sslmode: require for production

Snowflake

  • Use dedicated warehouse for ETL workloads
  • Use SSO (authenticator: externalbrowser) for development
  • Use role-based access control for security
  • Consider clustering keys for large tables
  • Use --where flag for date-based filtering


Questions? Issues?

File an issue on GitHub: https://github.com/asantora05/grai.build/issues