ETL with Arrow and DuckDB

SoCal RUG x UCI Hackathon - 2024.04.27

Javier Orraca-Deatcu
rrr.is/ArrowHackathonTalk

1. ETL: Basics Recap

Basic ETL with R & Python

R’s dplyr

  • dplyr is a data manipulation package for R that offers SQL-like functions such as select(), filter(), summarize(), and more to simplify data tasks

Python’s pandas

  • pandas is a data analysis library in Python that supports the same kind of selecting, filtering, and manipulation tasks

Basic ETL with R & Python

  • Both libraries are essential for data manipulation and are designed to be intuitive and performant
  • but first…

we need

data!

Palmer Penguins

Artwork by Allison Horst

Artwork by Allison Horst | https://allisonhorst.github.io/palmerpenguins

Palmer Penguins

Palmer Penguins

  • Install palmerpenguins and load the curated data set
install.packages("palmerpenguins")

library(palmerpenguins)

penguins_df <- palmerpenguins::penguins
pip install palmerpenguins

from palmerpenguins import load_penguins

penguins_df = load_penguins()

Take a glimpse() at the Data

library(dplyr)

glimpse(penguins_df)
Rows: 344
Columns: 8
$ species           <fct> Adelie, Adelie, Adelie, Adelie, Adelie, Adelie, Adel…
$ island            <fct> Torgersen, Torgersen, Torgersen, Torgersen, Torgerse…
$ bill_length_mm    <dbl> 39.1, 39.5, 40.3, NA, 36.7, 39.3, 38.9, 39.2, 34.1, …
$ bill_depth_mm     <dbl> 18.7, 17.4, 18.0, NA, 19.3, 20.6, 17.8, 19.6, 18.1, …
$ flipper_length_mm <int> 181, 186, 195, NA, 193, 190, 181, 195, 193, 190, 186…
$ body_mass_g       <int> 3750, 3800, 3250, NA, 3450, 3650, 3625, 4675, 3475, …
$ sex               <fct> male, female, female, NA, female, male, female, male…
$ year              <int> 2007, 2007, 2007, 2007, 2007, 2007, 2007, 2007, 2007…
print(penguins_df.head())

Functions We’ll Explore

  • select
  • filter
  • arrange
  • mutate
  • group_by
  • summarize

select

  • Subset columns with the select() function
penguins_mod <- penguins_df |> 
  select(species, sex)

glimpse(penguins_mod)
Rows: 344
Columns: 2
$ species <fct> Adelie, Adelie, Adelie, Adelie, Adelie, Adelie, Adelie, Adelie…
$ sex     <fct> male, female, female, NA, female, male, female, male, NA, NA, …
import pandas as pd

penguins_mod = penguins_df[['species', 'sex']]

select

  • Using the column (or “variable”) names, select() let us easily subset the data

Image credit: Duke University https://intro2r.library.duke.edu/wrangle.html

filter

  • Subset rows with the filter() function
penguins_mod <- penguins_df |> 
  filter(sex == "female",
         bill_length_mm <= 40)

glimpse(penguins_mod)
Rows: 66
Columns: 8
$ species           <fct> Adelie, Adelie, Adelie, Adelie, Adelie, Adelie, Adel…
$ island            <fct> Torgersen, Torgersen, Torgersen, Torgersen, Torgerse…
$ bill_length_mm    <dbl> 39.5, 36.7, 38.9, 36.6, 38.7, 34.4, 37.8, 35.9, 35.3…
$ bill_depth_mm     <dbl> 17.4, 19.3, 17.8, 17.8, 19.0, 18.4, 18.3, 19.2, 18.9…
$ flipper_length_mm <int> 186, 193, 181, 185, 195, 184, 174, 189, 187, 172, 17…
$ body_mass_g       <int> 3800, 3450, 3625, 3700, 3450, 3325, 3400, 3800, 3800…
$ sex               <fct> female, female, female, female, female, female, fema…
$ year              <int> 2007, 2007, 2007, 2007, 2007, 2007, 2007, 2007, 2007…
penguins_mod = penguins_df[
    (penguins_df['sex'] == 'female') &
    (penguins_df['bill_length_mm'] <= 40)
]

filter

  • The filter() function let’s us keep rows where the provided expression(s) are TRUE

Image credit: Duke University https://intro2r.library.duke.edu/wrangle.html

arrange

  • Sorting rows by column names can be accomplished with the arrange() function
  • In the example on the next slide, we’ll sort the data by bill_length_mm ascending and by year descending

arrange

penguins_mod <- penguins_df |> 
  arrange(bill_length_mm, desc(year))

glimpse(penguins_mod)
Rows: 344
Columns: 8
$ species           <fct> Adelie, Adelie, Adelie, Adelie, Adelie, Adelie, Adel…
$ island            <fct> Dream, Dream, Torgersen, Dream, Torgersen, Torgersen…
$ bill_length_mm    <dbl> 32.1, 33.1, 33.5, 34.0, 34.1, 34.4, 34.5, 34.6, 34.6…
$ bill_depth_mm     <dbl> 15.5, 16.1, 19.0, 17.1, 18.1, 18.4, 18.1, 17.2, 21.1…
$ flipper_length_mm <int> 188, 178, 190, 185, 193, 184, 187, 189, 198, 192, 19…
$ body_mass_g       <int> 3050, 2900, 3600, 3400, 3475, 3325, 2900, 3200, 4400…
$ sex               <fct> female, female, female, female, NA, female, female, …
$ year              <int> 2009, 2008, 2008, 2008, 2007, 2007, 2008, 2008, 2007…
penguins_mod = penguins_df.sort_values(
    by=['bill_length_mm', 'year'], 
    ascending=[True, False]
)

print(penguins_mod.head())

arrange

  • The arrange() function sorts rows in order of one or more columns
  • You can arrange in ascending order (the default) or descending order by wrapping the column name in desc()

Image credit: Duke University https://intro2r.library.duke.edu/wrangle.html

mutate

  • Create or modify columns with mutate()
penguins_mod <- penguins_df |> 
  mutate(body_mass_kg = body_mass_g / 1000)

glimpse(penguins_mod)
Rows: 344
Columns: 9
$ species           <fct> Adelie, Adelie, Adelie, Adelie, Adelie, Adelie, Adel…
$ island            <fct> Torgersen, Torgersen, Torgersen, Torgersen, Torgerse…
$ bill_length_mm    <dbl> 39.1, 39.5, 40.3, NA, 36.7, 39.3, 38.9, 39.2, 34.1, …
$ bill_depth_mm     <dbl> 18.7, 17.4, 18.0, NA, 19.3, 20.6, 17.8, 19.6, 18.1, …
$ flipper_length_mm <int> 181, 186, 195, NA, 193, 190, 181, 195, 193, 190, 186…
$ body_mass_g       <int> 3750, 3800, 3250, NA, 3450, 3650, 3625, 4675, 3475, …
$ sex               <fct> male, female, female, NA, female, male, female, male…
$ year              <int> 2007, 2007, 2007, 2007, 2007, 2007, 2007, 2007, 2007…
$ body_mass_kg      <dbl> 3.750, 3.800, 3.250, NA, 3.450, 3.650, 3.625, 4.675,…
penguins_mod = penguins_df.assign(
    body_mass_kg = penguins_df['body_mass_g'] / 1000
)

print(penguins_mod.head())

mutate

  • Add new columns to your dataframe or tibble with mutate()
  • dplyr’s mutate() is similar to base R’s transform() with the added benefit of being able to reference columns you’ve just created

Image credit: Duke University https://intro2r.library.duke.edu/wrangle.html

group_by and summarize

  • group_by() variables and summarize() results
penguins_mod <- penguins_df |> 
  group_by(sex) |> 
  summarize(mean_mass_g = mean(body_mass_g, na.rm = TRUE))

glimpse(penguins_mod)
Rows: 3
Columns: 2
$ sex         <fct> female, male, NA
$ mean_mass_g <dbl> 3862.273, 4545.685, 4005.556
grouped = penguins_df.groupby('sex')['body_mass_g'].transform(lambda x: x.mean(skipna=True))

penguins_mod = penguins_df.assign(mean_mass_g=grouped)

print(penguins_mod.head())

group_by and summarize

  • summarize() creates a new data frame
  • it returns one row for each combination of grouping variables
  • if there are no grouping variables, the output will have a single row summarizing all observations in the input

Chaining Methods

  • In R, the native pipe operator (|>) or magrittr pipe (%>%) can be used to chain or pipe dplyr functions together
penguins_mod <- penguins_df |> 
  select(sex, bill_length_mm, body_mass_g) |> 
  filter(sex == "female",
         bill_length_mm > 30) |> 
  group_by(sex) |> 
  summarize(mean_mass_g = mean(body_mass_g, na.rm = TRUE))

glimpse(penguins_mod)
Rows: 1
Columns: 2
$ sex         <fct> female
$ mean_mass_g <dbl> 3862.273

Chaining Methods

  • In Python, the period (.) is used as the chain functions together
selected = penguins_df[['sex', 'bill_length_mm', 'body_mass_g']]

filtered = selected[(selected['sex'] == 'female') & (selected['bill_length_mm'] > 30)]

penguins_mod = filtered.groupby('sex').agg(mean_mass_g=('body_mass_g', 'mean')).reset_index()

2. Arrow

Larger-than-Memory Data


Arrow and Parquet

  • Columnar memory format for flat data
  • Ultra-fast read times from Parquet files
  • Easily convert data.frames and tibbles to Arrow tables in-line
  • Plays well with Pandas (Python) and dplyr (R)
  • Arrow will soon be able to process nested list data

How fast is fast?

  • My personal laptop has 24 GB RAM
  • To test Arrow’s capabilities, I read a 40 GB dataset with over 1.1 billion rows and 24 columns
  • The .parquet dataset was partitioned by Year and Month (120 files)
  • Important to note that my laptop would not be able to load this object entirely into memory as a data.frame or tibble given my laptop’s limited RAM

Benchmarking Read Times

# Install full Arrow with S3 and GCS support:
# Sys.setenv("LIBARROW_MINIMAL" = FALSE)
# Sys.setenv("ARROW_S3" = "ON")
# Sys.setenv("ARROW_GCS" = "ON")
# install.packages("arrow")

library(here)
library(arrow)
library(dplyr)
library(ggplot2)
library(bench)

# NYC Taxi Data download (40 GB)
data_path <- here::here("data/nyc-taxi")

open_dataset("s3://voltrondata-labs-datasets/nyc-taxi") |>
  filter(year %in% 2012:2021) |> 
  write_dataset(data_path, partitioning = c("year", "month"))

Benchmarking Read Times

# 2. Benchmark Read Times
bnch <- bench::mark(
  min_iterations = 1000,
  arrow = open_dataset(here::here("data/nyc-taxi"))
)

autoplot(bnch)

Benchmarking Read Times

  • Results show read times from a 40GB parquet, 1.1 billion row dataset (benchmarked over 1,000 iterations)

arrow + dplyr Benchmark

# 1. Open Arrow connection to dataset (40 GB)
nyc_taxi <- open_dataset(here::here("data/nyc-taxi"))

# 2. Benchmark dplyr pipeline
bnch <- bench::mark(
  min_iterations = 10,
  arrow = nyc_taxi |> 
    dplyr::group_by(year) |> 
    dplyr::summarise(all_trips = n(),
                     shared_trips = sum(passenger_count > 1, na.rm = T)) |>
    dplyr::mutate(pct_shared = shared_trips / all_trips * 100) |> 
    dplyr::collect()
)

autoplot(bnch)

arrow + dplyr Benchmark

  • Arrow + dplyr summarized 1.1 billion rows in less than 5s (benchmarked over 10 iterations) to a 10 x 4 tibble

Tidyverse Compatibility

  • Many functions from the tidyverse collections of packages have 1:1 compatibility with Arrow tables
  • However, sometimes you’ll encounter a breaking point
  • Take this stringr::str_replace_na() example:
nyc_taxi |> 
  mutate(vendor_name = str_replace_na(vendor_name, "No vendor"))
#> Error: Expression str_replace_na(vendor_name, "No vendor") 
#> not supported in Arrow
  • This stringr function is not supported by Arrow

but wait!

problem solved

User Defined Functions

  • Lucky for us, Arrow allows us to create and register User Defined Functions (“UDFs”) to the Arrow engine
  • Almost any function can be made compatible with Arrow by registering custom UDFs
  • Let’s learn how to register str_replace_na() with the Arrow kernel

Registering UDFs

  • First, run arrow::schema() on your Arrow table to review the field name / data type pairs
  • Since I want to mutate the vendor_name field, I know I’ll be working with an Arrow string() data type
arrow::schema(nyc_taxi)
#> Schema
#> vendor_name: string
#> pickup_datetime: timestamp[ms]
#> dropoff_datetime: timestamp[ms]
#> passenger_count: int64
#> trip_distance: double
#> pickup_longitude: double
#> pickup_latitude: double
#> ...

Registering UDFs

  • Next, use register_scalar_function()
  • Name your UDF “replace_arrow_nas” and remember to set auto_convert = TRUE
arrow::register_scalar_function(
  name = "replace_arrow_nas",
  # Note: The first argument must always be context
  function(context, x, replacement) {
    stringr::str_replace_na(x, replacement)
  },
  in_type = schema(
    x = string(),
    replacement = string()
  ),
  out_type = string(),
  auto_convert = TRUE
)

Registering UDFs

  • Try your new registered function
nyc_taxi |> 
  mutate(vendor_name = replace_arrow_nas(vendor_name, "No vendor")) |> 
  distinct(vendor_name) |> 
  arrange(vendor_name) |> 
  collect()
#> # A tibble: 3 × 1
#>   vendor_name
#>   <chr>      
#> 1 CMT
#> 2 No vendor
#> 3 VTS

What’s next for Arrow?

ADBC: Arrow Database Connectivity

  • Competitor to JDBC & ODBC allowing applications to code to this API standard but fetching results in an Arrow format

What’s next for Arrow?

ADBC: Arrow Database Connectivity

What’s next for Arrow?

Arrow Flight SQL

  • A protocol for interacting with SQL databases using the Arrow in-memory format and the Flight RPC framework
  • Its natural mode is to stream sequences of Arrow “record batches” to reduce or remove the serialization cost associated with data transport
  • The design goal for Flight is to create a new protocol for data services that uses the Arrow columnar format as both the over-the-wire data representation as well as the public API presented to developers

3. DuckDB

DuckDB

  • DuckDB Labs created an in-line database management system, like a SQLite database engine, but optimized for distributed compute and optimized for larger-than-memory analysis
  • The duckdb package for Python offers a state-of-the-art optimizer that pushes down filters and projections directly into Arrow scans
  • As a result, only relevant columns and partitions will be read thus significantly accelerates query execution

DuckDB setup



# Connect to dataset and write data
# to a partitioned Parquet file:
open_dataset("s3://voltrondata-labs-datasets/nyc-taxi") |>
  filter(year %in% 2012:2021) |> 
  write_dataset("<your local write path>", 
                partitioning = c("year", "month"))
import os

# Set environment variables
os.environ["LIBARROW_MINIMAL"] = "FALSE"
os.environ["ARROW_S3"] = "ON"
os.environ["ARROW_GCS"] = "ON"

!pip install pyarrow
!pip install duckdb

import pyarrow as pa
import pyarrow.dataset as ds
import duckdb

dataset = ds.dataset("s3://voltrondata-labs-datasets/nyc-taxi", format="parquet")
filtered_data = dataset.to_table(filter=ds.field('year').isin(range(2012, 2022)))
filtered_data.write_to_dataset("<your local write path>", partitioning=["year", "month"])

DuckDB Basics

library(duckdb)
library(arrow)
library(dplyr)

ds <- arrow::open_dataset("nyc-taxi", partitioning = c("year", "month"))

ds |>
  filter(year > 2014 & passenger_count > 0 & 
           trip_distance > 0.25 & fare_amount > 0) |>
  # Pass off to DuckDB
  to_duckdb() |>
  group_by(passenger_count) |>
  mutate(tip_pct = tip_amount / fare_amount) |>
  summarise(fare_amount = mean(fare_amount, na.rm = TRUE),
            tip_amount = mean(tip_amount, na.rm = TRUE),
            tip_pct = mean(tip_pct, na.rm = TRUE)) |>
  arrange(passenger_count) |>
  collect()
import duckdb
import pyarrow as pa
import pyarrow.dataset as ds

# Open dataset using year,month folder partition
nyc = ds.dataset('nyc-taxi/', partitioning=["year", "month"])

# We transform the nyc dataset into a DuckDB relation
nyc = duckdb.arrow(nyc)

# Run same query again
nyc.filter("year > 2014 & passenger_count > 0 & trip_distance > 0.25 & fare_amount > 0")
    .aggregate("SELECT AVG(fare_amount), AVG(tip_amount), AVG(tip_amount / fare_amount) as tip_pct","passenger_count").arrow()

DuckDB Streaming

# Reads dataset partitioning it in year/month folder
nyc_dataset = open_dataset("nyc-taxi/", partitioning = c("year", "month"))

# Gets Database Connection
con <- dbConnect(duckdb::duckdb())

# We can use the same function as before to register our arrow dataset
duckdb::duckdb_register_arrow(con, "nyc", nyc_dataset)

res <- dbSendQuery(con, "SELECT * FROM nyc", arrow = TRUE)

# DuckDB's queries can now produce a Record Batch Reader
record_batch_reader <- duckdb::duckdb_fetch_record_batch(res)

# Which means we can stream the whole query per batch.
# This retrieves the first batch
cur_batch <- record_batch_reader$read_next_batch()
# Reads dataset partitioning it in year/month folder
nyc_dataset = ds.dataset('nyc-taxi/', partitioning=["year", "month"])

# Gets Database Connection
con = duckdb.connect()

query = con.execute("SELECT * FROM nyc_dataset")

# DuckDB's queries can now produce a Record Batch Reader
record_batch_reader = query.fetch_record_batch()

# Which means we can stream the whole query per batch.
# This retrieves the first batch
chunk = record_batch_reader.read_next_batch()

DuckDB Streaming Speed

# DuckDB via Python
# Open dataset using year,month folder partition
nyc = ds.dataset('nyc-taxi/', partitioning=["year", "month"])

# Get database connection
con = duckdb.connect()

# Run query that selects part of the data
query = con.execute("SELECT total_amount, passenger_count,year FROM nyc where total_amount > 100 and year > 2014")

# Create Record Batch Reader from Query Result.
# "fetch_record_batch()" also accepts an extra parameter related to the desired produced chunk size.
record_batch_reader = query.fetch_record_batch()

# Retrieve all batch chunks
chunk = record_batch_reader.read_next_batch()
while len(chunk) > 0:
    chunk = record_batch_reader.read_next_batch()
# We must exclude one of the columns of the NYC dataset due to an unimplemented cast in Arrow
working_columns = ["vendor_id","pickup_at","dropoff_at","passenger_count","trip_distance","pickup_longitude",
    "pickup_latitude","store_and_fwd_flag","dropoff_longitude","dropoff_latitude","payment_type",
    "fare_amount","extra","mta_tax","tip_amount","tolls_amount","total_amount","year", "month"]

# Open dataset using year,month folder partition
nyc_dataset = ds.dataset(dir, partitioning=["year", "month"])
# Generate a scanner to skip problematic column
dataset_scanner = nyc_dataset.scanner(columns=working_columns)

# Materialize dataset to an Arrow Table
nyc_table = dataset_scanner.to_table()

# Generate Dataframe from Arow Table
nyc_df = nyc_table.to_pandas()

# Apply Filter
filtered_df = nyc_df[
    (nyc_df.total_amount > 100) &
    (nyc_df.year >2014)]

# Apply Projection
res = filtered_df[["total_amount", "passenger_count","year"]]

# Transform Result back to an Arrow Table
new_table = pa.Table.from_pandas(res)

DuckDB Streaming Speed

  • Pandas runtime was 146.91 seconds
  • DuckDB runtime was 0.05 seconds

duckplyr

  • duckplyr, from DuckDB Labs, offers 1:1 compatibility with dplyr functions but there are some caveats:
    • factor columns, nested lists, and nested tibbles are not yet supported
    • you have to use .by in dplyr::summarize() as dplyr::group_by() will not be supported by the developers

Benchmarking Analysis

  • Arrow and DuckDB really stood out for fast manipulation of data using dplyr syntax
  • The code below shows the basic transformation done to the NYC Taxi dataset via dplyr, arrow, duckdb, and duckplyr
nyc_taxi |> 
    dplyr::filter(passenger_count > 1) |> 
    dplyr::group_by(year) |> 
    dplyr::summarise(all_trips = n(),
                     shared_trips = sum(passenger_count, na.rm = T)) |>
    dplyr::mutate(pct_shared = shared_trips / all_trips * 100) |> 
    dplyr::collect()

Benchmark: 1 million rows

Benchmark: 10 million rows

Benchmark: 100 million rows

Benchmark: 500 million rows

questions?

4. Bonus!

Spatial Analysis

  • In Sep 2023, I attended a Big Data in R with Arrow workshop hosted by the amazing software devs and educators Steph Hazlitt & Nic Crane
  • One of the neater capabilities of this workshop was the geospatial mapping capabilities that combine Arrow, ggplot2, and SF

Spatial Analysis

  • The link below contains a script from the Big Data in R with Arrow workshop that explores these packages and introduces an ultra-fast method for working with geospatial data and rendering maps
  • Source: Arrow_for_Spatial_Data.R

Spatial Analysis

  • Example ggplot2 output, below: