library(duckdb)Warning: package 'duckdb' was built under R version 4.5.2
Loading required package: DBI
library(dplyr, warn.conflicts = FALSE)
library(tidyr, warn.conflicts = FALSE)Use a familiar R frontend
For the d(b)plyr workflow, the connection step is very similar to the pure SQL approach. The only difference is that, after instantiating the database connection, we need to register our parquet dataset as a table in our connection via the dplyr::tbl() function. Note that we also assign it to an object (here: nyc) that can be referenced from R.
## Instantiate the in-memory DuckDB connection
con = dbConnect(duckdb(), shutdown = TRUE)
## Register our parquet dataset as table in our connection (and that assign it
## to an object that R understands)
# nyc = tbl(con, "nyc-taxi/**/*.parquet") # works, but safer to use the read_parquet func)
nyc = tbl(con, "read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)")This next command runs instantly because all computation is deferred (i.e., lazy eval). In other words, it is just a query object.
.by versus group_by
In case you weren’t aware: summarize(..., .by = x) is a shorthand (and non-persistent) version of group_by(x) |> summarize(...). More details here.
We can see what DuckDB’s query tree looks like by asking it to explain the plan
Warning: Missing values are always removed in SQL aggregation functions.
Use `na.rm = TRUE` to silence this warning
This warning is displayed once every 8 hours.
<SQL>
SELECT passenger_count, AVG(tip_amount) AS mean_tip
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
GROUP BY passenger_count
<PLAN>
physical_plan
┌───────────────────────────┐
│ PERFECT_HASH_GROUP_BY │
│ ──────────────────── │
│ Groups: #0 │
│ Aggregates: avg(#1) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ PROJECTION │
│ ──────────────────── │
│ passenger_count │
│ CAST(tip_amount AS DOUBLE)│
│ │
│ ~563,696,520 rows │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ READ_PARQUET │
│ ──────────────────── │
│ Function: │
│ READ_PARQUET │
│ │
│ Projections: │
│ passenger_count │
│ tip_amount │
│ │
│ ~563,696,520 rows │
└───────────────────────────┘
Similarly, to show the SQL translation that will be implemented on the backend, using show_query.
<SQL>
SELECT passenger_count, AVG(tip_amount) AS mean_tip
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
GROUP BY passenger_count
Note that printing the query object actually does enforce some computation. OTOH it’s still just a preview of the data (we haven’t pulled everything into R’s memory).
# Source: SQL [?? x 2]
# Database: DuckDB 1.4.2 [floswald@Darwin 24.6.0:R 4.5.1/:memory:]
passenger_count mean_tip
<int> <dbl>
1 -127 0.5
2 -123 0
3 -122 5
4 -119 9
5 -115 2
6 -101 46.6
7 -98 2
8 -96 2
9 -93 1
10 -92 8
# ℹ more rows
To actually pull all of the result data into R, we must call collect() on the query object
# A tibble: 53 × 2
passenger_count mean_tip
<int> <dbl>
1 -127 0.5
2 -123 0
3 -122 5
4 -119 9
5 -115 2
6 -101 46.6
7 -98 2
8 -96 2
9 -93 1
10 -92 8
# ℹ 43 more rows
Time difference of 0.7157481 secs
Oh. Well that was fast. But also, what are those negative passenger counts? Must be corrupt data. Let’s filter those out.
# A tibble: 33 × 2
passenger_count mean_tip
<int> <dbl>
1 1 0.838
2 2 0.760
3 3 0.672
4 4 0.560
5 5 0.821
6 6 0.923
7 7 0.531
8 8 0.15
9 9 0.484
10 10 3.22
# ℹ 23 more rows
ok much better.
Let’s figure out what column names we have stored in this data set:
[1] "vendor_id" "pickup_at" "dropoff_at"
[4] "passenger_count" "trip_distance" "pickup_longitude"
[7] "pickup_latitude" "rate_code_id" "store_and_fwd_flag"
[10] "dropoff_longitude" "dropoff_latitude" "payment_type"
[13] "fare_amount" "extra" "mta_tax"
[16] "tip_amount" "tolls_amount" "total_amount"
[19] "month"
Notice that we cannot on the R side do things which require scanning the entire database table:
Here’s our earlier filtering example with multiple grouping + aggregation variables…
# Source: SQL [?? x 4]
# Database: DuckDB 1.4.2 [floswald@Darwin 24.6.0:R 4.5.1/:memory:]
month passenger_count tip_amount fare_amount
<chr> <int> <dbl> <dbl>
1 01 1 0.776 9.56
2 02 -31 3 12.5
3 03 2 0.798 10.4
4 03 -48 0.496 6.75
5 01 8 0 21.3
6 02 97 2 6.90
7 01 4 0.539 9.79
8 03 37 0 13.7
9 02 0 0.838 8.67
10 01 7 0.383 5.97
# ℹ more rows
Aside: note the optimised query includes hash groupings and projection (basically: fancy column subsetting, which is a suprisingly effective strategy in query optimization)
<SQL>
SELECT
"month",
passenger_count,
AVG(tip_amount) AS tip_amount,
AVG(fare_amount) AS fare_amount
FROM (
SELECT q01.*
FROM (FROM read_parquet('nyc-taxi/**/*.parquet', hive_partitioning = true)) q01
WHERE (CAST("month" AS NUMERIC) <= 3.0)
) q01
GROUP BY "month", passenger_count
<PLAN>
physical_plan
┌───────────────────────────┐
│ HASH_GROUP_BY │
│ ──────────────────── │
│ Groups: │
│ #0 │
│ #1 │
│ │
│ Aggregates: │
│ avg(#2) │
│ avg(#3) │
│ │
│ ~563,696,507 rows │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ PROJECTION │
│ ──────────────────── │
│ month │
│ passenger_count │
│ CAST(tip_amount AS DOUBLE)│
│CAST(fare_amount AS DOUBLE)│
│ │
│ ~563,696,520 rows │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ FILTER │
│ ──────────────────── │
│ (CAST(month AS DECIMAL(18 │
│ ,3)) <= 3.000) │
│ │
│ ~563,696,520 rows │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ READ_PARQUET │
│ ──────────────────── │
│ Function: │
│ READ_PARQUET │
│ │
│ Projections: │
│ passenger_count │
│ fare_amount │
│ tip_amount │
│ month │
│ │
│ ~563,696,520 rows │
└───────────────────────────┘
And our high-dimensional aggregation example. We’ll create a query for this first, since I’ll reuse it shortly again
`summarise()` has grouped output by "passenger_count". You can override using
the `.groups` argument.
# A tibble: 34,490 × 4
# Groups: passenger_count [53]
passenger_count trip_distance tip_amount fare_amount
<int> <dbl> <dbl> <dbl>
1 1 2 0.661 8.41
2 1 0.800 0.364 5.12
3 1 9.60 2.68 24.5
4 2 2.90 0.725 10.7
5 1 3.90 1.09 13.0
6 1 4.30 1.16 13.8
7 2 9.70 2.17 25.0
8 1 8.80 2.40 22.7
9 2 1.30 0.413 6.63
10 1 18.1 4.37 45.1
# ℹ 34,480 more rows
`summarise()` has grouped output by "passenger_count". You can override using
the `.groups` argument.
`summarise()` has grouped output by "passenger_count". You can override using
the `.groups` argument.
# A tibble: 68,980 × 4
# Groups: passenger_count [53]
passenger_count trip_distance name value
<int> <dbl> <chr> <dbl>
1 1 0.200 tip_amount 0.271
2 1 0.5 tip_amount 0.268
3 1 5.60 tip_amount 1.36
4 2 0.900 tip_amount 0.330
5 1 7.80 tip_amount 1.89
6 1 5.90 tip_amount 1.41
7 2 1.60 tip_amount 0.472
8 1 8.30 tip_amount 2.15
9 2 0.600 tip_amount 0.260
10 1 5.10 tip_amount 1.28
# ℹ 68,970 more rows
Again, these commands complete instantly because all computation has been deferred until absolutely necessary (i.e.,. lazy eval).
Joining with `by = join_by(month)`
# A tibble: 12 × 3
month mean_fares mean_tips
<chr> <dbl> <dbl>
1 12 10.2 0.819
2 02 9.80 0.829
3 07 10.1 0.778
4 09 10.3 0.824
5 11 10.2 0.834
6 04 10.1 0.815
7 10 10.2 0.821
8 08 10.1 0.784
9 05 10.3 0.841
10 03 10.0 0.853
11 06 10.2 0.786
12 01 9.62 0.753
If you recall from the native SQL API, we sampled 1 percent of the data before creating decile bins to reduce the computation burden of sorting the entire table. Unfortunately, this approach doesn’t work as well for the dplyr frontend because the underlying SQL translation uses a generic sampling approach (rather than DuckDB’s optimised USING SAMPLE statement.)
When going through the arrow intermediary, we don’t need to establish a database with DBI::dbConnect like we did above. Instead, we can create a link (pointers) to the dataset on disk directly via the arrow::open_dataset() convience function. Here I’ll assign it to a new R object called nyc2.
(For individual parquet files, we could just read then via arrow::read_parquet(), perhaps efficiently subsetting columns at the same time. But I find the open_dataset is generally what I’m looking for.)
Note that printing our nyc2 dataset to the R console will just display the data schema. This is a cheap and convenient way to quickly interrogate the basic structure of your data, including column types, etc.
FileSystemDataset with 40 Parquet files
19 columns
vendor_id: string
pickup_at: timestamp[us]
dropoff_at: timestamp[us]
passenger_count: int8
trip_distance: float
pickup_longitude: float
pickup_latitude: float
rate_code_id: null
store_and_fwd_flag: string
dropoff_longitude: float
dropoff_latitude: float
payment_type: string
fare_amount: float
extra: float
mta_tax: float
tip_amount: float
tolls_amount: float
total_amount: float
month: int32
See $metadata for additional Schema metadata
The key step for this “arrow + duckdb” dplyr workflow is to pass our arrow dataset to DuckDB via the to_duckdb() function.
# Source: table<arrow_001> [?? x 19]
# Database: DuckDB 1.4.2 [floswald@Darwin 24.6.0:R 4.5.1/:memory:]
vendor_id pickup_at dropoff_at passenger_count
<chr> <dttm> <dttm> <int>
1 VTS 2009-01-04 02:52:00 2009-01-04 03:02:00 1
2 VTS 2009-01-04 03:31:00 2009-01-04 03:38:00 3
3 VTS 2009-01-03 15:43:00 2009-01-03 15:57:00 5
4 DDS 2009-01-01 20:52:58 2009-01-01 21:14:00 1
5 DDS 2009-01-24 16:18:23 2009-01-24 16:24:56 1
6 DDS 2009-01-16 22:35:59 2009-01-16 22:43:35 2
7 DDS 2009-01-21 08:55:57 2009-01-21 09:05:42 1
8 VTS 2009-01-04 04:31:00 2009-01-04 04:36:00 1
9 CMT 2009-01-05 16:29:02 2009-01-05 16:40:21 1
10 CMT 2009-01-05 18:53:13 2009-01-05 18:57:45 1
# ℹ more rows
# ℹ 15 more variables: trip_distance <dbl>, pickup_longitude <dbl>,
# pickup_latitude <dbl>, rate_code_id <int>, store_and_fwd_flag <chr>,
# dropoff_longitude <dbl>, dropoff_latitude <dbl>, payment_type <chr>,
# fare_amount <dbl>, extra <dbl>, mta_tax <dbl>, tip_amount <dbl>,
# tolls_amount <dbl>, total_amount <dbl>, month <int>
Note that this transfer from Arrow to DuckDB is very quick (and memory cheap) because it is a zero copy. We are just passing around pointers instead of actually moving any data. See this blog post for more details, but the high-level take away is that we are benefitting from the tightly integrated architectures of these two libraries.1
At this, point all of the regular dplyr workflow logic from above should carry over. Just remember to first pass the arrow dataset via the to_duckdb() funciton. For example, here’s our initial aggregation query again:
# A tibble: 53 × 2
passenger_count mean_tip
<int> <dbl>
1 -127 0.5
2 -123 0
3 -122 5
4 -119 9
5 -115 2
6 -101 46.6
7 -98 2
8 -96 2
9 -93 1
10 -92 8
# ℹ 43 more rows
Some of you may be used to performing computation with the arrow package without going through DuckDB. What’s happening here is that arrow provides its own computation engine called “acero”. This Arrow-native engine is actually pretty performant… albeit not a fast as DuckDB, nor as feature rich. So I personally recommend always passing to DuckDB if you can. Still, if you’re curious then you can test yourself by re-trying the code chunk, but commenting out the to_duckdb() line. For more details, see here.
The new kid on the block is duckplyr (announcement / homepage). Without going into too much depth, the promise of duckplyr is that it can provide a “fully native” dplyr experience that is directly coupled to DuckDB’s query engine. So, for example, it won’t have to rely on DBI’s generic’ SQL translations. Instead, the relevant dplyr “verbs” are being directly translated to DuckDB’s relational API to construct logical query plans. If that’s too much jargon, just know that it should involve less overhead, fewer translation errors, and better optimization. Moreover, a goal of duckplyr is for it to be a drop-in replace for dplyr in general. In other words, you could just swap out library(dplyr) for library(duckplyr) and all of your data wrangling operations will come backed by the power of DuckDB. This includes for working on “regular” R data frames in memory.
All of this is exciting and I would urge you stay tuned. Right now, duckplyr is still marked as experimental and has a few rough edges. But the basics are there. For example:
The duckplyr package is configured to fall back to dplyr when it encounters an
incompatibility. Fallback events can be collected and uploaded for analysis to
guide future development. By default, data will be collected but no data will
be uploaded.
ℹ Automatic fallback uploading is not controlled and therefore disabled, see
`?duckplyr::fallback()`.
✔ Number of reports ready for upload: 2.
→ Review with `duckplyr::fallback_review()`, upload with
`duckplyr::fallback_upload()`.
ℹ Configure automatic uploading with `duckplyr::fallback_config()`.
✔ Overwriting dplyr methods with duckplyr methods.
ℹ Turn off with `duckplyr::methods_restore()`.
Warning: `duckplyr_df_from_parquet()` was deprecated in duckplyr 1.0.0.
ℹ Please use `read_parquet_duckdb()` instead.
# A duckplyr data frame: 2 variables
passenger_count mean_tip
<int> <dbl>
1 -127 0.5
2 -123 0
3 -122 5
4 -119 9
5 -115 2
6 -101 46.6
7 -98 2
8 -96 2
9 -93 1
10 -92 8
# ℹ more rows
“Similar” might be a better description than “integrated”, since DuckdB does not use the Arrow memory model. But they are both columnar-orientated (among other things) and so the end result is pretty seamless integration.↩︎