Using the DataFrame API — Apache DataFusion documentation (original) (raw)
The Users Guide introduces the DataFrame API and this section describes that API in more depth.
What is a DataFrame?¶
As described in the Users Guide, DataFusion DataFrames are modeled after the Pandas DataFrame interface, and are implemented as thin wrapper over aLogicalPlan that adds functionality for building and executing those plans.
The simplest possible dataframe is one that scans a table and that table can be in a file or in memory.
How to generate a DataFrame¶
You can construct DataFrames programmatically using the API, similarly to other DataFrame APIs. For example, you can read an in memory RecordBatch
into a DataFrame
:
use std::sync::Arc; use datafusion::prelude::*; use datafusion::arrow::array::{ArrayRef, Int32Array}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result;
#[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); // Register an in-memory table containing the following data // id | bank_account // ---|------------- // 1 | 9000 // 2 | 8000 // 3 | 7000 let data = RecordBatch::try_from_iter(vec![ ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), ])?; // Create a DataFrame that scans the user table, and finds // all users with a bank account at least 8000 // and sorts the results by bank account in descending order let dataframe = ctx .read_batch(data)? .filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000 .sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY bank_account DESC
Ok(())
}
You can also generate a DataFrame
from a SQL query and use the DataFrame’s APIs to manipulate the output of the query.
use std::sync::Arc; use datafusion::prelude::*; use datafusion::assert_batches_eq; use datafusion::arrow::array::{ArrayRef, Int32Array}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result;
#[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); // Register the same in-memory table as the previous example let data = RecordBatch::try_from_iter(vec![ ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), ])?; ctx.register_batch("users", data)?; // Create a DataFrame using SQL let dataframe = ctx.sql("SELECT * FROM users;") .await? // Note we can filter the output of the query using the DataFrame API .filter(col("bank_account").gt_eq(lit(8000)))?; // bank_account >= 8000
let results = &dataframe.collect().await?;
// use the `assert_batches_eq` macro to show the output
assert_batches_eq!(
vec![
"+----+--------------+",
"| id | bank_account |",
"+----+--------------+",
"| 1 | 9000 |",
"| 2 | 8000 |",
"+----+--------------+",
],
&results
);
Ok(())
}
Collect / Streaming Exec¶
DataFusion DataFrames are “lazy”, meaning they do no processing until they are executed, which allows for additional optimizations.
You can run a DataFrame
in one of three ways:
collect
: executes the query and buffers all the output into aVec<RecordBatch>
execute_stream
: begins executions and returns aSendableRecordBatchStream
which incrementally computes output on each call tonext()
cache
: executes the query and buffers the output into a new in memoryDataFrame.
To collect all outputs into a memory buffer, use the collect
method:
use datafusion::prelude::*; use datafusion::error::Result;
#[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); // read the contents of a CSV file into a DataFrame let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; // execute the query and collect the results as a Vec let batches = df.collect().await?; for record_batch in batches { println!("{record_batch:?}"); } Ok(()) }
Use execute_stream
to incrementally generate output one RecordBatch
at a time:
use datafusion::prelude::*; use datafusion::error::Result; use futures::stream::StreamExt;
#[tokio::main] async fn main() -> Result<()> { let ctx = SessionContext::new(); // read example.csv file into a DataFrame let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; // begin execution (returns quickly, does not compute results) let mut stream = df.execute_stream().await?; // results are returned incrementally as they are computed while let Some(record_batch) = stream.next().await { println!("{record_batch:?}"); } Ok(()) }
Write DataFrame to Files¶
You can also write the contents of a DataFrame
to a file. When writing a file, DataFusion executes the DataFrame
and streams the results to the output. DataFusion comes with support for writing csv
, json
arrow
avro
, andparquet
files, and supports writing custom file formats via API (seecustom_file_format.rs for an example)
For example, to read a CSV file and write it to a parquet file, use theDataFrame::write_parquet method
use datafusion::prelude::*; use datafusion::error::Result; use datafusion::dataframe::DataFrameWriteOptions;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// stream the contents of the DataFrame to the example.parquet
file
let target_path = tempfile::tempdir()?.path().join("example.parquet");
df.write_parquet(
target_path.to_str().unwrap(),
DataFrameWriteOptions::new(),
None, // writer_options
).await;
Ok(())
}
The output file will look like (Example Output):
select * from '../datafusion/core/example.parquet'; +---+---+---+ | a | b | c | +---+---+---+ | 1 | 2 | 3 | +---+---+---+
Relationship between LogicalPlan
s and DataFrame
s¶
The DataFrame
struct is defined like this:
use datafusion::execution::session_state::SessionState; use datafusion::logical_expr::LogicalPlan; pub struct DataFrame { // state required to execute a LogicalPlan session_state: Box, // LogicalPlan that describes the computation to perform plan: LogicalPlan, }
As shown above, DataFrame
is a thin wrapper of LogicalPlan
, so you can easily go back and forth between them.
use datafusion::prelude::*; use datafusion::error::Result; use datafusion::logical_expr::LogicalPlanBuilder;
#[tokio::main] async fn main() -> Result<()>{ let ctx = SessionContext::new(); // read example.csv file into a DataFrame let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; // You can easily get the LogicalPlan from the DataFrame let (_state, plan) = df.into_parts(); // Just combine LogicalPlan with SessionContext and you get a DataFrame // get LogicalPlan in dataframe let new_df = DataFrame::new(ctx.state(), plan); Ok(()) }
In fact, using the DataFrames methods you can create the sameLogicalPlans as when using LogicalPlanBuilder:
use datafusion::prelude::*; use datafusion::error::Result; use datafusion::logical_expr::LogicalPlanBuilder;
#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// Create a new DataFrame sorted by id
, bank_account
let new_df = df.select(vec![col("a"), col("b")])?
.sort_by(vec![col("a")])?;
// Build the same plan using the LogicalPlanBuilder
// Similar to SELECT a, b FROM example.csv ORDER BY a
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let (_state, plan) = df.into_parts(); // get the DataFrame's LogicalPlan
let plan = LogicalPlanBuilder::from(plan)
.project(vec![col("a"), col("b")])?
.sort_by(vec![col("a")])?
.build()?;
// prove they are the same
assert_eq!(new_df.logical_plan(), &plan);
Ok(())
}