Database pipeline workshop

Purpose of this workshop

The previous vignettes introduced the two central ideas of featdelta:

  1. feature logic can be written and organized in R;
  2. the computed features can be stored in a database table and refreshed over time.

This workshop focuses on the database side of the package. It shows how featdelta decides which rows need features, how it writes computed features back to the database, and how the full pipeline is orchestrated by fd_run().

The database pipeline is useful when raw observations arrive repeatedly and the feature table should be kept up to date without rebuilding everything by hand. The package is designed around three stages:

  1. fd_fetch() finds raw rows that are missing from the feature table;
  2. fd_compute() computes R feature definitions on those rows;
  3. fd_upsert() inserts or updates the computed features in the database.

Most users will call fd_run(), which combines these stages. The individual functions are still useful when you want to inspect, debug, or test one stage at a time.

Supported database dialects

featdelta uses DBI connections and selects SQL templates for the database dialect. The supported dialects are:

In this vignette, the runnable examples use an in-memory SQLite database because it does not require credentials or an external server. The same pipeline is intended to work with PostgreSQL and MySQL through DBI-compatible drivers.

# SQLite
con <- DBI::dbConnect(RSQLite::SQLite(), ":memory:")

# PostgreSQL
con <- DBI::dbConnect(
  RPostgres::Postgres(),
  host = "localhost",
  dbname = "analytics",
  user = "analyst",
  password = "secret"
)

# MySQL or MariaDB
con <- DBI::dbConnect(
  RMariaDB::MariaDB(),
  host = "localhost",
  dbname = "analytics",
  user = "analyst",
  password = "secret"
)

Usually the dialect is detected from the connection. If detection is not possible, you can pass dialect = "sqlite", dialect = "postgres", or dialect = "mysql" to fd_run() or fd_upsert().

Workshop database

We will use a small e-commerce order dataset. The data is intentionally simple, but it gives us a realistic database workflow:

  1. raw orders arrive in a table;
  2. R definitions calculate transformed features;
  3. features are written to a separate feature table;
  4. new raw orders are processed incrementally.
library(DBI)
library(RSQLite)
library(featdelta)

con <- dbConnect(SQLite(), ":memory:")

orders <- data.frame(
  order_id = 1:7,
  customer_id = c(101, 102, 103, 101, 104, 105, 102),
  gross_amount = c(120, 250, 80, 310, 45, 520, 160),
  discount_amount = c(0, 25, 5, 30, 0, 60, 10),
  shipping_fee = c(8, 0, 6, 0, 5, 0, 7),
  order_to_ship_days = c(1, 3, 2, 5, 1, 4, 2),
  stringsAsFactors = FALSE
)

day_one <- 1:4
day_two <- 5:7

dbWriteTable(
  con,
  "raw_orders",
  orders[day_one, ],
  overwrite = TRUE
)

dbGetQuery(con, "SELECT * FROM raw_orders ORDER BY order_id")
#>   order_id customer_id gross_amount discount_amount shipping_fee
#> 1        1         101          120               0            8
#> 2        2         102          250              25            0
#> 3        3         103           80               5            6
#> 4        4         101          310              30            0
#>   order_to_ship_days
#> 1                  1
#> 2                  3
#> 3                  2
#> 4                  5

The raw table contains day-one observations only. The feature table does not exist yet.

Define the source query

featdelta does not require the source data to come from one table. The source dataset is defined by a SQL query. In real projects, this query may join several tables, filter to a modelling population, or select only specific records.

source_sql <- "
  SELECT
    order_id,
    customer_id,
    gross_amount,
    discount_amount,
    shipping_fee,
    order_to_ship_days
  FROM raw_orders
  ORDER BY order_id
"

key <- "order_id"

dbGetQuery(con, source_sql)
#>   order_id customer_id gross_amount discount_amount shipping_fee
#> 1        1         101          120               0            8
#> 2        2         102          250              25            0
#> 3        3         103           80               5            6
#> 4        4         101          310              30            0
#>   order_to_ship_days
#> 1                  1
#> 2                  3
#> 3                  2
#> 4                  5

There is one important requirement: the query must return the key column. The key connects the raw dataset to the feature table. It is how featdelta knows whether a raw row has already been processed.

Define features in R

The feature logic is still ordinary R. Here we define a few transformed variables that are more useful for analytics or reporting than the raw columns.

defs <- fd_define(
  net_revenue = gross_amount - discount_amount + shipping_fee,
  discount_rate = discount_amount / gross_amount,
  free_shipping = shipping_fee == 0,
  slow_fulfillment = order_to_ship_days > 3
)

defs
#> <featdelta_defs>
#> Definition steps (4):
#>   - [column] net_revenue -> gross_amount - discount_amount + shipping_fee
#>   - [column] discount_rate -> discount_amount/gross_amount
#>   - [column] free_shipping -> shipping_fee == 0
#>   - [column] slow_fulfillment -> order_to_ship_days > 3

The database pipeline does not require you to write these transformations in SQL. You can keep them in R, test them with fd_compute(), and then use the same definitions in the database pipeline.

First run: create and populate the feature table

On the first run, the feature table is missing. fd_run() executes the source query, computes the features, creates the feature table, and inserts the result.

run_day_one <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  verbose = FALSE
)

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
#>   order_id net_revenue discount_rate free_shipping slow_fulfillment
#> 1        1         128    0.00000000             0                0
#> 2        2         225    0.10000000             1                0
#> 3        3          81    0.06250000             0                0
#> 4        4         280    0.09677419             1                1

The feature table is now a database table. Other R scripts, dashboards, model training jobs, or monitoring jobs can read it without rerunning the feature engineering code.

Second run: fetch only rows that are missing features

Now add more raw orders. This simulates new observations arriving after the first feature run.

dbAppendTable(con, "raw_orders", orders[day_two, ])
#> [1] 3

dbGetQuery(con, "SELECT * FROM raw_orders ORDER BY order_id")
#>   order_id customer_id gross_amount discount_amount shipping_fee
#> 1        1         101          120               0            8
#> 2        2         102          250              25            0
#> 3        3         103           80               5            6
#> 4        4         101          310              30            0
#> 5        5         104           45               0            5
#> 6        6         105          520              60            0
#> 7        7         102          160              10            7
#>   order_to_ship_days
#> 1                  1
#> 2                  3
#> 3                  2
#> 4                  5
#> 5                  1
#> 6                  4
#> 7                  2

The raw table contains seven orders, but the feature table still contains features for only the first four.

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
#>   order_id net_revenue discount_rate free_shipping slow_fulfillment
#> 1        1         128    0.00000000             0                0
#> 2        2         225    0.10000000             1                0
#> 3        3          81    0.06250000             0                0
#> 4        4         280    0.09677419             1                1

When we run the pipeline again with the default fetch_mode = "new_only", featdelta finds the raw keys missing from the feature table and computes only those rows.

run_day_two <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  verbose = FALSE
)

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
#>   order_id net_revenue discount_rate free_shipping slow_fulfillment
#> 1        1         128    0.00000000             0                0
#> 2        2         225    0.10000000             1                0
#> 3        3          81    0.06250000             0                0
#> 4        4         280    0.09677419             1                1
#> 5        5          50    0.00000000             0                0
#> 6        6         460    0.11538462             1                1
#> 7        7         157    0.06250000             0                0

The existing feature rows were not duplicated. Only the missing keys were inserted.

Inspect fd_fetch() directly

Most of the time, fd_run() is enough. During development, it can be useful to look at the fetch stage directly. fd_fetch() returns raw rows whose key is not present in the feature table.

Let’s demonstrate that with an example: first, add one more raw order.

new_order <- data.frame(
  order_id = 8,
  customer_id = 106,
  gross_amount = 275,
  discount_amount = 20,
  shipping_fee = 0,
  order_to_ship_days = 6
)

dbAppendTable(con, "raw_orders", new_order)
#> [1] 1

new_rows <- fd_fetch(
  con = con,
  sql = source_sql,
  key = key,
  feat_table_name = "order_features"
)

new_rows
#>   order_id customer_id gross_amount discount_amount shipping_fee
#> 1        8         106          275              20            0
#>   order_to_ship_days
#> 1                  6

This is the row that would be computed on the next incremental run. The fetch result also carries metadata, including the SQL that was executed.

attr(new_rows, "fd_fetch")
#> $key
#> [1] "order_id"
#> 
#> $feat_table_name
#> [1] "order_features"
#> 
#> $use_max_key
#> [1] FALSE
#> 
#> $max_key
#> [1] NA
#> 
#> $sql
#> [1] "SELECT\n    order_id,\n    customer_id,\n    gross_amount,\n    discount_amount,\n    shipping_fee,\n    order_to_ship_days\n  FROM raw_orders\n  ORDER BY order_id"
#> 
#> $executed_sql
#> [1] "SELECT r.* FROM (SELECT\n    order_id,\n    customer_id,\n    gross_amount,\n    discount_amount,\n    shipping_fee,\n    order_to_ship_days\n  FROM raw_orders\n  ORDER BY order_id) AS r LEFT JOIN `order_features` AS f ON r.`order_id` = f.`order_id` WHERE f.`order_id` IS NULL"
#> 
#> $n_rows
#> [1] 1

This can help when you want to verify that the package is selecting the rows you expect.

Inspect fd_upsert() directly

fd_upsert() writes a feature data frame into the database. It is useful when you already have computed features and want to use only the persistence part of the package.

new_features <- fd_compute(
  data = new_rows,
  defs = defs,
  key = key
)

upsert_report <- fd_upsert(
  con = con,
  features_df = new_features,
  feat_table_name = "order_features",
  key = key,
  verbose = FALSE
)

upsert_report
#> $feat_table_name
#> [1] "order_features"
#> 
#> $key
#> [1] "order_id"
#> 
#> $dialect
#> [1] "sqlite"
#> 
#> $n_rows
#> [1] 1
#> 
#> $n_chunks
#> [1] 1
#> 
#> $table_created
#> [1] FALSE
#> 
#> $columns_added
#> character(0)
#> 
#> $extra_columns
#> character(0)
#> 
#> $counts
#> $counts$would_insert
#> [1] 1
#> 
#> $counts$would_update
#> [1] 0
#> 
#> 
#> $chunk_details
#>   chunk n would_insert would_update
#> 1     1 1            1            0
#> 
#> attr(,"class")
#> [1] "fd_upsert_report"

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
#>   order_id net_revenue discount_rate free_shipping slow_fulfillment
#> 1        1         128    0.00000000             0                0
#> 2        2         225    0.10000000             1                0
#> 3        3          81    0.06250000             0                0
#> 4        4         280    0.09677419             1                1
#> 5        5          50    0.00000000             0                0
#> 6        6         460    0.11538462             1                1
#> 7        7         157    0.06250000             0                0
#> 8        8         255    0.07272727             1                1

The upsert report summarizes what the write attempted to do. The would_insert and would_update counts are existence-based counts computed before the merge. They tell you how many incoming keys were new and how many already existed.

Add a new feature column

Feature tables often evolve. Suppose we add a new feature, high_value_order. If we run the pipeline with fetch_mode = "new_only", only new rows would be processed. Existing rows would not receive the new column values.

When you want to backfill or refresh existing rows, use fetch_mode = "all".

defs_v2 <- fd_define(
  net_revenue = gross_amount - discount_amount + shipping_fee,
  discount_rate = discount_amount / gross_amount,
  free_shipping = shipping_fee == 0,
  slow_fulfillment = order_to_ship_days > 3,
  high_value_order = gross_amount >= 250
)

refresh_report <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs_v2,
  key = key,
  feat_table_name = "order_features",
  fetch_mode = "all",
  verbose = FALSE
)

dbListFields(con, "order_features")
#> [1] "order_id"         "net_revenue"      "discount_rate"    "free_shipping"   
#> [5] "slow_fulfillment" "high_value_order"

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
#>   order_id net_revenue discount_rate free_shipping slow_fulfillment
#> 1        1         128    0.00000000             0                0
#> 2        2         225    0.10000000             1                0
#> 3        3          81    0.06250000             0                0
#> 4        4         280    0.09677419             1                1
#> 5        5          50    0.00000000             0                0
#> 6        6         460    0.11538462             1                1
#> 7        7         157    0.06250000             0                0
#> 8        8         255    0.07272727             1                1
#>   high_value_order
#> 1                0
#> 2                1
#> 3                0
#> 4                1
#> 5                0
#> 6                1
#> 7                0
#> 8                1

With the default alter_table = TRUE, fd_upsert() adds missing feature columns to the target table. This schema evolution is intentionally conservative: columns can be added, but columns are not dropped, renamed, or type-changed automatically.

Understand create_table and alter_table

The default create_table = "auto" creates the feature table when it does not exist and uses the existing table when it does. This is convenient for ordinary incremental pipelines.

If you want to be stricter, use:

# Error if the table does not already exist
fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  create_table = FALSE
)

# Error if new feature columns are missing from the existing table
fd_run(
  con = con,
  sql = source_sql,
  defs = defs_v2,
  key = key,
  feat_table_name = "order_features",
  alter_table = FALSE
)

This is useful in production environments where table creation or schema changes must be handled through a separate approval process.

Choose between new_only and all

The most important database-pipeline decision is the fetch mode.

Use fetch_mode = "new_only" for ordinary incremental processing. This mode uses the feature table to identify keys that have not yet been processed. It is the default.

Use fetch_mode = "all" when existing feature rows should be recomputed. Common reasons include:

  1. you changed a feature definition;
  2. you added a new feature and want old rows to receive values;
  3. you are performing an explicit backfill;
  4. you want to rebuild a feature table from the current source query.

fetch_mode = "new_only" is key-based. It does not know whether your feature logic changed. If the definitions changed and existing rows should be updated, use fetch_mode = "all".

Insert-only mode

By default, fd_upsert() updates existing keys and inserts new keys. This is usually what you want for refreshes and backfills.

If you want a stricter insert-only workflow, set update_table = FALSE. In that mode, any incoming key that already exists in the feature table is treated as a conflict.

fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  fetch_mode = "all",
  update_table = FALSE
)

This can be useful when your pipeline is intended only to append new feature rows and you want accidental refreshes to fail loudly.

Process large writes in chunks

For larger feature frames, use chunk_size to write the data in batches. This can reduce memory and packet-size pressure on the database connection.

fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  chunk_size = 5000
)

Chunking does not change the feature logic. It only changes how computed rows are staged and merged into the target table.

Dialect notes for PostgreSQL and MySQL

SQLite is convenient for local examples. In production, many users will connect to PostgreSQL, MySQL, or MariaDB.

For PostgreSQL and MySQL, you can use schema-qualified feature table names when the driver supports them through DBI identifiers:

fd_run(
  con = con,
  sql = "SELECT * FROM raw_schema.orders",
  defs = defs,
  key = "order_id",
  feat_table_name = "feature_schema.order_features",
  dialect = "postgres"
)

fd_run(
  con = con,
  sql = "SELECT * FROM raw_schema.orders",
  defs = defs,
  key = "order_id",
  feat_table_name = "feature_schema.order_features",
  dialect = "mysql"
)

Existing target tables used with update_table = TRUE must have a primary key or unique constraint on the key column. Tables created by fd_upsert() include that primary key automatically.

If your DBI connection is correctly identified, you usually do not need the dialect argument. It is available as an explicit override when automatic detection is not sufficient.

Read the run report

The database table is the main result of the pipeline, but the run report is useful for checking what happened.

names(refresh_report)
#>  [1] "success"         "stage"           "started_at"      "finished_at"    
#>  [5] "time_sec"        "key"             "feat_table_name" "dialect"        
#>  [9] "sql"             "fetch"           "compute"         "upsert"         
#> [13] "preview"         "data"            "error"

refresh_report$fetch
#> $mode
#> [1] "all"
#> 
#> $source
#> [1] "all"
#> 
#> $table_exists
#> [1] TRUE
#> 
#> $use_max_key
#> [1] FALSE
#> 
#> $limit
#> NULL
#> 
#> $limit_applied
#> [1] FALSE
#> 
#> $n_rows_before_limit
#> [1] 8
#> 
#> $n_rows
#> [1] 8
#> 
#> $fd_fetch
#> NULL

refresh_report$compute$feature_names
#> [1] "net_revenue"      "discount_rate"    "free_shipping"    "slow_fulfillment"
#> [5] "high_value_order"

refresh_report$upsert$counts
#> $would_insert
#> [1] 0
#> 
#> $would_update
#> [1] 8

refresh_report$upsert$columns_added
#> [1] "high_value_order"

The report is especially useful in development and monitoring. It shows which stage ran, how many rows were fetched and computed, and what the upsert stage planned to insert or update.

What to remember

The database pipeline exists to keep a feature table current while letting the feature logic stay in R.

Use fd_run() for the regular end-to-end workflow. Use fd_fetch() when you want to inspect which raw rows still need features. Use fd_upsert() when you already have a feature data frame and want to persist it safely.

For daily or repeated processing, start with the default fetch_mode = "new_only". When feature definitions change and old rows need to be refreshed, switch to fetch_mode = "all". For production databases, make sure the feature table has a stable key and choose the dialect that matches the DBI connection: SQLite, PostgreSQL, or MySQL/MariaDB.