Skip to main content

Delta Replicate Stock Data from SAP ERP to Parquet

This guide demonstrates how to use SAP ODP (Operational Data Provisioning) to replicate stock data from SAP ERP to DuckDB and export it to Parquet files. ODP provides a standardized way to extract and replicate data from SAP systems.

What is SAP ODP?

SAP ODP (Operational Data Provisioning) is a framework that provides:

  • Standardized Data Extraction: Consistent interface for data replication
  • Delta Capabilities: Incremental data updates
  • Real-time Processing: Near real-time data availability
  • Multiple Formats: Support for various data formats
  • Error Handling: Built-in error recovery and monitoring

Prerequisites

Before starting ODP replication, ensure you have:

  • ERPL ODP extension installed (subscription required)
  • Access to SAP ERP system with ODP enabled
  • ODP extractors configured for your data sources
  • User account with ODP authorizations
  • Sufficient storage space for Parquet files

Understanding ODP Extractors

Types of ODP Extractors

  1. CDS Views: Core Data Services views as ODP sources
  2. BW Extractors: Traditional BW extractors
  3. Function Modules: Custom function modules
  4. Tables: Direct table access

Stock Data Extractors

Common ODP extractors for stock data:

  • MCHB: Material stock by storage location
  • MSKA: Material stock by sales area
  • MSLB: Material stock by storage location and batch
  • MCH1: Material stock by plant

Setting Up ODP Replication

Step 1: Discover Available Extractors

-- List all available ODP contexts
SELECT * FROM sap_odp_show_contexts();

-- List extractors in BW context
SELECT * FROM sap_odp_show('BW');

-- Search for stock-related extractors
SELECT * FROM sap_odp_show('BW')
WHERE data_source LIKE '%STOCK%';

Step 2: Get Extractor Metadata

-- Get detailed information about a specific extractor
SELECT * FROM sap_odp_describe('BW', 'MCHB$F');

This returns information about:

  • Available fields
  • Key fields
  • Data types
  • Extraction methods

Step 3: Configure the SAP Connection

ERPL uses DuckDB secrets to authenticate. Create one once per system:

CREATE SECRET erp_prod (
TYPE sap_rfc,
ASHOST 'your-sap-server.com',
SYSNR '00',
CLIENT '100',
USER 'your-username',
PASSWD 'your-password',
LANG 'EN'
);

Subsequent ERPL calls pick up this secret automatically. Pass secret => 'erp_prod' if you have several.

Basic ODP Replication

Simple Full Replication

sap_odp_read_full is a one-shot scan: it opens a FULL cursor on SAP, streams the snapshot, and closes the cursor. No subscription survives.

-- Replicate all stock data from MCHB extractor
SELECT * FROM sap_odp_read_full('BW', 'MCHB$F');

-- Project columns and parallelize
SELECT * FROM sap_odp_read_full(
'BW', 'MCHB$F',
columns => ['MATNR', 'WERKS', 'CLABS'],
threads => 4
);

Server-side filters

-- Restrict to a single plant via the ODP filter pushdown
SELECT * FROM sap_odp_read_full(
'BW', 'MCHB$F',
filters => [{
'FIELDNAME': 'WERKS',
'SIGN': 'I',
'OP': 'EQ',
'LOW': '1000',
'HIGH': ''
}]
);

Delta Replication

Understanding Delta Replication

sap_odp_read_delta is the delta-capable companion to sap_odp_read_full. It takes a third positional argument — subscriber_process — that keys a server-side delta pointer:

  1. First call with a new subscriber_process → SAP performs auto-DELTAINIT, returns the full current snapshot AND registers the pointer.
  2. Subsequent calls with the same subscriber_process → returns only the changes since the previous call.
  3. Recovery → pass recover => true to re-stream the last unconfirmed packet without advancing the pointer.

Pick a stable, descriptive subscriber_process per pipeline (e.g. 'STOCK_DELTA_DAILY'). It is the identity that ties calls together; do not change it between runs.

Initial Delta Setup

-- First call: auto-DELTAINIT.  Returns the full snapshot AND registers
-- the delta pointer under subscriber_process 'STOCK_DELTA_DAILY'.
SELECT * FROM sap_odp_read_delta('BW', 'MCHB$F', 'STOCK_DELTA_DAILY');

-- Confirm the cursor exists
SELECT * FROM sap_odp_show_cursors(subscriber_name => 'ERPL')
WHERE subscriber_proc = 'STOCK_DELTA_DAILY';

Delta Replication Process

-- 1. Cheap probe — has anything changed since last run?
SELECT last_modified FROM sap_odp_get_last_modified('BW', 'MCHB$F');

-- 2. Pull the delta
SELECT * FROM sap_odp_read_delta('BW', 'MCHB$F', 'STOCK_DELTA_DAILY');

-- 3. Inspect cursor state
SELECT * FROM sap_odp_show_cursors(subscriber_name => 'ERPL');

-- 4. Release the cursor when the pipeline shuts down (subscription stays;
-- next run resumes from the same pointer).
PRAGMA sap_odp_close_delta_cursor('BW', 'STOCK_DELTA_DAILY', 'MCHB$F');

Complete Stock Data Replication Example

Step 1: Setup Replication Environment

-- Create the target table by introspecting the source schema
CREATE TABLE stock_data AS
SELECT * FROM sap_odp_read_full('BW', 'MCHB$F') WHERE 1=0;

-- Optional: a run log driven by ERPL's own state
CREATE TABLE replication_run_log (
subscriber_proc VARCHAR,
odp_name VARCHAR,
run_started TIMESTAMP,
rows_inserted BIGINT
);

Step 2: Initial Delta Load (DELTAINIT)

-- First call returns the full snapshot AND registers the delta pointer
INSERT INTO stock_data
SELECT * FROM sap_odp_read_delta('BW', 'MCHB$F', 'STOCK_DELTA_DAILY');

-- Record the run
INSERT INTO replication_run_log
SELECT 'STOCK_DELTA_DAILY', 'MCHB$F', now(), COUNT(*) FROM stock_data;

Step 3: Scheduled Delta Pull

DuckDB doesn't have PL/pgSQL — schedule the SQL below from your orchestrator (cron, Airflow, dbt, …). Each invocation pulls only the changes since the previous one:

-- 1. Probe — skip the run if nothing changed
WITH probe AS (
SELECT last_modified FROM sap_odp_get_last_modified('BW', 'MCHB$F')
)
SELECT 'skipped' AS status
WHERE (SELECT last_modified FROM probe) = (
SELECT COALESCE(MAX(EPOCH(run_started)), 0) FROM replication_run_log
WHERE subscriber_proc = 'STOCK_DELTA_DAILY'
);

-- 2. Pull the delta
INSERT INTO stock_data
SELECT * FROM sap_odp_read_delta('BW', 'MCHB$F', 'STOCK_DELTA_DAILY');

-- 3. Log the run
INSERT INTO replication_run_log
SELECT 'STOCK_DELTA_DAILY', 'MCHB$F', now(),
(SELECT COUNT(*) FROM stock_data);

Step 4: Export to Parquet

-- Export stock data to Parquet
COPY (
SELECT
MATNR as material_number,
WERKS as plant,
LGORT as storage_location,
CHARG as batch,
CLABS as unrestricted_stock,
CUMLM as stock_in_transit,
CINSM as stock_in_quality_inspection
FROM stock_data
) TO 'stock_data.parquet' WITH HEADER;

Advanced ODP Features

Error Handling

-- Inspect cursors for this pipeline
SELECT * FROM sap_odp_show_cursors(subscriber_name => 'ERPL')
WHERE subscriber_proc = 'STOCK_DELTA_DAILY';

-- Re-stream the last unconfirmed packet (no pointer advance)
SELECT * FROM sap_odp_read_delta(
'BW', 'MCHB$F', 'STOCK_DELTA_DAILY',
recover => true
);

-- If the cursor is wedged, hard-reset and let DELTAINIT re-snapshot
PRAGMA sap_odp_drop('BW', 'ERPL', 'STOCK_DELTA_DAILY', 'MCHB$F');
SELECT * FROM sap_odp_read_delta('BW', 'MCHB$F', 'STOCK_DELTA_DAILY');

Performance Optimization

-- Project columns and parallelize at the ODP fetch level
SELECT * FROM sap_odp_read_delta(
'BW', 'MCHB$F', 'STOCK_DELTA_DAILY',
columns => ['MATNR', 'WERKS', 'CLABS'],
threads => 4
);

-- Preview data before extraction (no cursor side effects)
SELECT * FROM sap_odp_preview('BW', 'MCHB$F');

Data Transformation

-- Transform data during extraction
SELECT
MATNR as material_number,
WERKS as plant,
LGORT as storage_location,
CLABS as stock_quantity,
CASE
WHEN CLABS > 1000 THEN 'HIGH_STOCK'
WHEN CLABS > 100 THEN 'MEDIUM_STOCK'
ELSE 'LOW_STOCK'
END as stock_level
FROM sap_odp_read_delta('BW', 'MCHB$F', 'STOCK_DELTA_DAILY');

Monitoring and Maintenance

Replication Monitoring

-- Inspect ERPL-owned subscriptions
SELECT queue_name, subscriber_type, subscriber_name, subscriber_proc
FROM sap_odp_show_subscriptions();

-- Cross-team visibility on a source — who else is reading it?
SELECT * FROM sap_odp_get_subscriptions('BW', 'MCHB$F');

-- Monitor extraction cursors (request_date = last cursor activity)
SELECT subscriber_proc, pointer, is_closed, is_delta_extension, request_date
FROM sap_odp_show_cursors(subscriber_name => 'ERPL');

Data Quality Checks

-- Validate data completeness
SELECT
COUNT(*) as total_records,
COUNT(DISTINCT MATNR) as unique_materials,
COUNT(DISTINCT WERKS) as unique_plants
FROM stock_data;

-- Check for data anomalies
SELECT
MATNR,
WERKS,
CLABS
FROM stock_data
WHERE CLABS < 0 OR CLABS > 1000000;

Cleanup and Archiving

-- Archive old data
CREATE TABLE stock_data_archive AS
SELECT * FROM stock_data
WHERE extraction_date < CURRENT_DATE - INTERVAL '1 year';

-- Clean up archived data
DELETE FROM stock_data
WHERE extraction_date < CURRENT_DATE - INTERVAL '1 year';

Integration with Cloud Platforms

AWS S3 Integration

-- Export to S3
COPY (
SELECT * FROM stock_data
) TO 's3://your-bucket/stock-data/stock_data.parquet'
WITH HEADER;

Google Cloud Storage

-- Export to GCS
COPY (
SELECT * FROM stock_data
) TO 'gs://your-bucket/stock-data/stock_data.parquet'
WITH HEADER;

Azure Blob Storage

-- Export to Azure Blob
COPY (
SELECT * FROM stock_data
) TO 'az://your-container/stock-data/stock_data.parquet'
WITH HEADER;

Best Practices

1. Delta Replication Strategy

  • Use appropriate delta modes (FULL, DELTA, INIT)
  • Monitor delta tokens regularly
  • Implement error recovery mechanisms
  • Schedule regular delta extractions

2. Performance Optimization

  • Use parallel extraction where possible
  • Implement data compression
  • Optimize filter conditions
  • Monitor extraction performance

3. Data Quality

  • Implement data validation rules
  • Monitor data completeness
  • Handle missing or invalid data
  • Maintain data lineage

4. Security and Compliance

  • Use secure connections
  • Implement proper authorization
  • Log all extractions
  • Protect sensitive data

Troubleshooting

Common Issues

  1. Extraction Failures: Inspect the cursor in sap_odp_show_cursors; try recover => true before resetting
  2. Stuck Cursor: PRAGMA sap_odp_drop wipes the subscription; the next call auto-DELTAINITs
  3. Performance Problems: Project columns, push filters server-side, raise threads
  4. Data Quality Issues: Add post-extraction validation queries against the staging table

Debugging Tips

-- Enable ERPL trace
SET erpl_trace_enabled = TRUE;
SET erpl_trace_level = 'DEBUG';

-- Inspect cursor state for this pipeline
SELECT * FROM sap_odp_show_cursors(subscriber_name => 'ERPL')
WHERE subscriber_proc = 'STOCK_DELTA_DAILY';

-- See every subscriber on the source (not just ERPL's)
SELECT * FROM sap_odp_get_subscriptions('BW', 'MCHB$F');

-- Cheap preview with no cursor side effects
SELECT * FROM sap_odp_preview('BW', 'MCHB$F');

Next Steps

Now that you understand ODP replication, explore:

Additional Resources