From 09bae6ea388ea26e40a623aee04e2f3d82ffc78d Mon Sep 17 00:00:00 2001 From: Christian McArthur Date: Sat, 4 Apr 2026 13:47:07 -0400 Subject: [PATCH 1/3] feat: add read_parquet, read_csv, read_json, read_avro SQL table functions Built-in table functions that read files directly from SQL without prior registration. Each wraps ListingTable with the appropriate FileFormat, inheriting full optimizer support (filter pushdown, projection pushdown, partition pruning, limit pushdown) automatically. Parquet and Avro are feature-gated; CSV and JSON are always available. Schema inference uses block_in_place to bridge the sync TableFunctionImpl trait with async ListingOptions::infer_schema. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 7 + datafusion/core/Cargo.toml | 4 +- datafusion/core/tests/sql/mod.rs | 1 + .../core/tests/sql/read_table_functions.rs | 420 ++++++++++++++++++ datafusion/functions-table/Cargo.toml | 12 + datafusion/functions-table/src/lib.rs | 48 +- datafusion/functions-table/src/read_avro.rs | 73 +++ datafusion/functions-table/src/read_csv.rs | 76 ++++ datafusion/functions-table/src/read_json.rs | 76 ++++ .../functions-table/src/read_parquet.rs | 74 +++ 10 files changed, 788 insertions(+), 3 deletions(-) create mode 100644 datafusion/core/tests/sql/read_table_functions.rs create mode 100644 datafusion/functions-table/src/read_avro.rs create mode 100644 datafusion/functions-table/src/read_csv.rs create mode 100644 datafusion/functions-table/src/read_json.rs create mode 100644 datafusion/functions-table/src/read_parquet.rs diff --git a/Cargo.lock b/Cargo.lock index 895b3059f50c1..8cbec2ad0e35d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2328,10 +2328,17 @@ dependencies = [ "arrow", "async-trait", "datafusion-catalog", + "datafusion-catalog-listing", "datafusion-common", + "datafusion-datasource", + "datafusion-datasource-avro", + "datafusion-datasource-csv", + "datafusion-datasource-json", + "datafusion-datasource-parquet", "datafusion-expr", "datafusion-physical-plan", "parking_lot", + "tokio", ] [[package]] diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index c1230f7d5daa6..419802e1cc7b0 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -43,7 +43,7 @@ nested_expressions = ["datafusion-functions-nested"] # This feature is deprecated. Use the `nested_expressions` feature instead. array_expressions = ["nested_expressions"] # Used to enable the avro format -avro = ["datafusion-datasource-avro"] +avro = ["datafusion-datasource-avro", "datafusion-functions-table/avro"] backtrace = ["datafusion-common/backtrace"] compression = [ "liblzma", @@ -72,7 +72,7 @@ encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] math_expressions = ["datafusion-functions/math_expressions"] -parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet"] +parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet", "datafusion-functions-table/parquet"] parquet_encryption = [ "parquet", "parquet/encryption", diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 9a1dc5502ee60..dbdbb765ed658 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -67,6 +67,7 @@ pub mod create_drop; pub mod explain_analyze; pub mod joins; mod path_partition; +mod read_table_functions; mod runtime_config; pub mod select; mod sql_api; diff --git a/datafusion/core/tests/sql/read_table_functions.rs b/datafusion/core/tests/sql/read_table_functions.rs new file mode 100644 index 0000000000000..2b578c2315c8b --- /dev/null +++ b/datafusion/core/tests/sql/read_table_functions.rs @@ -0,0 +1,420 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for built-in `read_parquet`, `read_csv`, and `read_json` table functions. + +use std::sync::Arc; + +use arrow::array::{Float64Array, Int32Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; + +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion::error::Result; +use datafusion::prelude::*; +use datafusion_common::{assert_batches_eq, assert_contains}; + +use tempfile::TempDir; + +// --------------------------------------------------------------------------- +// read_parquet tests +// --------------------------------------------------------------------------- + +/// Create a temporary parquet file with known data and return the path. +async fn write_test_parquet(dir: &TempDir) -> Result { + let ctx = SessionContext::new(); + let path = dir.path().join("test.parquet").to_str().unwrap().to_string(); + + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ])), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0])), + ], + )?; + + ctx.read_batch(batch)? + .write_parquet( + &path, + DataFrameWriteOptions::new().with_single_file_output(true), + None, + ) + .await?; + + Ok(path) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_parquet_basic() -> Result<()> { + let tmp = TempDir::new()?; + let path = write_test_parquet(&tmp).await?; + + let ctx = SessionContext::new(); + let results = ctx + .sql(&format!("SELECT * FROM read_parquet('{path}') ORDER BY id")) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "+----+------+-------+", + "| id | name | value |", + "+----+------+-------+", + "| 1 | a | 1.0 |", + "| 2 | b | 2.0 |", + "| 3 | c | 3.0 |", + "| 4 | d | 4.0 |", + "| 5 | e | 5.0 |", + "+----+------+-------+", + ], &results); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_parquet_with_filter() -> Result<()> { + let tmp = TempDir::new()?; + let path = write_test_parquet(&tmp).await?; + + let ctx = SessionContext::new(); + let results = ctx + .sql(&format!( + "SELECT id, name FROM read_parquet('{path}') WHERE id > 3 ORDER BY id" + )) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "+----+------+", + "| id | name |", + "+----+------+", + "| 4 | d |", + "| 5 | e |", + "+----+------+", + ], &results); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_parquet_projection() -> Result<()> { + let tmp = TempDir::new()?; + let path = write_test_parquet(&tmp).await?; + + let ctx = SessionContext::new(); + let results = ctx + .sql(&format!( + "SELECT name FROM read_parquet('{path}') ORDER BY name" + )) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "+------+", + "| name |", + "+------+", + "| a |", + "| b |", + "| c |", + "| d |", + "| e |", + "+------+", + ], &results); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_parquet_count() -> Result<()> { + let tmp = TempDir::new()?; + let path = write_test_parquet(&tmp).await?; + + let ctx = SessionContext::new(); + let results = ctx + .sql(&format!("SELECT count(*) AS cnt FROM read_parquet('{path}')")) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "+-----+", + "| cnt |", + "+-----+", + "| 5 |", + "+-----+", + ], &results); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_parquet_glob() -> Result<()> { + let tmp = TempDir::new()?; + let ctx = SessionContext::new(); + + let batch1 = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![1, 2]))], + )?; + let batch2 = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![3, 4]))], + )?; + + let path1 = tmp.path().join("part1.parquet").to_str().unwrap().to_string(); + let path2 = tmp.path().join("part2.parquet").to_str().unwrap().to_string(); + + ctx.read_batch(batch1)? + .write_parquet( + &path1, + DataFrameWriteOptions::new().with_single_file_output(true), + None, + ) + .await?; + ctx.read_batch(batch2)? + .write_parquet( + &path2, + DataFrameWriteOptions::new().with_single_file_output(true), + None, + ) + .await?; + + let glob_path = tmp.path().join("*.parquet").to_str().unwrap().to_string(); + let results = ctx + .sql(&format!( + "SELECT count(*) AS cnt FROM read_parquet('{glob_path}')" + )) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "+-----+", + "| cnt |", + "+-----+", + "| 4 |", + "+-----+", + ], &results); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_parquet_no_args_error() -> Result<()> { + let ctx = SessionContext::new(); + let result = ctx.sql("SELECT * FROM read_parquet()").await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert_contains!(err, "read_parquet requires exactly 1 argument"); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_parquet_filter_pushdown() -> Result<()> { + let tmp = TempDir::new()?; + let path = write_test_parquet(&tmp).await?; + + let ctx = SessionContext::new(); + let df = ctx + .sql(&format!( + "EXPLAIN SELECT * FROM read_parquet('{path}') WHERE id > 3" + )) + .await?; + let results = df.collect().await?; + + // The filter should be pushed down — look for it in the physical plan + let plan_str = arrow::util::pretty::pretty_format_batches(&results)?.to_string(); + // Filter pushdown means the predicate appears inside the DataSourceExec scan + assert_contains!(plan_str, "predicate=id@0 > 3"); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_parquet_projection_pushdown() -> Result<()> { + let tmp = TempDir::new()?; + let path = write_test_parquet(&tmp).await?; + + let ctx = SessionContext::new(); + let df = ctx + .sql(&format!( + "EXPLAIN SELECT name FROM read_parquet('{path}')" + )) + .await?; + let results = df.collect().await?; + + let plan_str = arrow::util::pretty::pretty_format_batches(&results)?.to_string(); + // Projection pushdown means only 'name' column is read — the projection + // should appear in the scan node + assert_contains!(plan_str, "projection=[name]"); + Ok(()) +} + +// --------------------------------------------------------------------------- +// read_csv tests +// --------------------------------------------------------------------------- + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_csv_basic() -> Result<()> { + let tmp = TempDir::new()?; + let path = tmp.path().join("test.csv").to_str().unwrap().to_string(); + + std::fs::write( + &path, + "id,name,value\n1,a,1.0\n2,b,2.0\n3,c,3.0\n", + )?; + + let ctx = SessionContext::new(); + let results = ctx + .sql(&format!( + "SELECT * FROM read_csv('{path}') ORDER BY id" + )) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "+----+------+-------+", + "| id | name | value |", + "+----+------+-------+", + "| 1 | a | 1.0 |", + "| 2 | b | 2.0 |", + "| 3 | c | 3.0 |", + "+----+------+-------+", + ], &results); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_csv_with_filter() -> Result<()> { + let tmp = TempDir::new()?; + let path = tmp.path().join("test.csv").to_str().unwrap().to_string(); + + std::fs::write( + &path, + "id,name,value\n1,a,1.0\n2,b,2.0\n3,c,3.0\n", + )?; + + let ctx = SessionContext::new(); + let results = ctx + .sql(&format!( + "SELECT id, name FROM read_csv('{path}') WHERE id > 1 ORDER BY id" + )) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "+----+------+", + "| id | name |", + "+----+------+", + "| 2 | b |", + "| 3 | c |", + "+----+------+", + ], &results); + + Ok(()) +} + +// --------------------------------------------------------------------------- +// read_json tests +// --------------------------------------------------------------------------- + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_json_basic() -> Result<()> { + let tmp = TempDir::new()?; + let path = tmp.path().join("test.json").to_str().unwrap().to_string(); + + std::fs::write( + &path, + r#"{"id":1,"name":"a"} +{"id":2,"name":"b"} +{"id":3,"name":"c"} +"#, + )?; + + let ctx = SessionContext::new(); + let results = ctx + .sql(&format!( + "SELECT * FROM read_json('{path}') ORDER BY id" + )) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "+----+------+", + "| id | name |", + "+----+------+", + "| 1 | a |", + "| 2 | b |", + "| 3 | c |", + "+----+------+", + ], &results); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_json_with_filter() -> Result<()> { + let tmp = TempDir::new()?; + let path = tmp.path().join("test.json").to_str().unwrap().to_string(); + + std::fs::write( + &path, + r#"{"id":1,"name":"a"} +{"id":2,"name":"b"} +{"id":3,"name":"c"} +"#, + )?; + + let ctx = SessionContext::new(); + let results = ctx + .sql(&format!( + "SELECT name FROM read_json('{path}') WHERE id >= 2 ORDER BY name" + )) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "+------+", + "| name |", + "+------+", + "| b |", + "| c |", + "+------+", + ], &results); + + Ok(()) +} diff --git a/datafusion/functions-table/Cargo.toml b/datafusion/functions-table/Cargo.toml index 4edb640cb2cf2..d0aac7e09b43d 100644 --- a/datafusion/functions-table/Cargo.toml +++ b/datafusion/functions-table/Cargo.toml @@ -40,14 +40,26 @@ workspace = true [lib] name = "datafusion_functions_table" +[features] +default = ["parquet"] +parquet = ["datafusion-datasource-parquet"] +avro = ["datafusion-datasource-avro"] + [dependencies] arrow = { workspace = true } async-trait = { workspace = true } datafusion-catalog = { workspace = true } +datafusion-catalog-listing = { workspace = true } datafusion-common = { workspace = true } +datafusion-datasource = { workspace = true } +datafusion-datasource-avro = { workspace = true, optional = true } +datafusion-datasource-csv = { workspace = true } +datafusion-datasource-json = { workspace = true } +datafusion-datasource-parquet = { workspace = true, optional = true } datafusion-expr = { workspace = true } datafusion-physical-plan = { workspace = true } parking_lot = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread"] } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } diff --git a/datafusion/functions-table/src/lib.rs b/datafusion/functions-table/src/lib.rs index 668e964901c04..42d29c99029bf 100644 --- a/datafusion/functions-table/src/lib.rs +++ b/datafusion/functions-table/src/lib.rs @@ -26,13 +26,47 @@ #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] pub mod generate_series; +#[cfg(feature = "avro")] +pub mod read_avro; +pub mod read_csv; +pub mod read_json; +#[cfg(feature = "parquet")] +pub mod read_parquet; use datafusion_catalog::TableFunction; +use datafusion_common::{plan_err, Result}; +use datafusion_expr::Expr; use std::sync::Arc; +/// Extract a string path from a literal expression. +pub(crate) fn extract_path(expr: &Expr, func_name: &str) -> Result { + match expr { + Expr::Literal(scalar, _) => match scalar.try_as_str() { + Some(Some(s)) => Ok(s.to_string()), + _ => plan_err!( + "{func_name} requires a string literal path argument, got {scalar:?}" + ), + }, + _ => plan_err!( + "{func_name} requires a string literal path argument, got {expr:?}" + ), + } +} + /// Returns all default table functions pub fn all_default_table_functions() -> Vec> { - vec![generate_series(), range()] + #[cfg(any(feature = "parquet", feature = "avro"))] + let mut funcs = vec![generate_series(), range(), read_csv(), read_json()]; + #[cfg(not(any(feature = "parquet", feature = "avro")))] + let funcs = vec![generate_series(), range(), read_csv(), read_json()]; + + #[cfg(feature = "parquet")] + funcs.push(read_parquet()); + + #[cfg(feature = "avro")] + funcs.push(read_avro()); + + funcs } /// Creates a singleton instance of a table function @@ -62,3 +96,15 @@ create_udtf_function!( "generate_series" ); create_udtf_function!(generate_series::RangeFunc {}, range, "range"); +create_udtf_function!(read_csv::ReadCsvFunc {}, read_csv, "read_csv"); +create_udtf_function!(read_json::ReadJsonFunc {}, read_json, "read_json"); + +#[cfg(feature = "parquet")] +create_udtf_function!( + read_parquet::ReadParquetFunc {}, + read_parquet, + "read_parquet" +); + +#[cfg(feature = "avro")] +create_udtf_function!(read_avro::ReadAvroFunc {}, read_avro, "read_avro"); diff --git a/datafusion/functions-table/src/read_avro.rs b/datafusion/functions-table/src/read_avro.rs new file mode 100644 index 0000000000000..1a7cf4486c780 --- /dev/null +++ b/datafusion/functions-table/src/read_avro.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `read_avro` table function: reads Avro files as a table source. +//! +//! ```sql +//! SELECT * FROM read_avro('/path/to/file.avro') +//! SELECT * FROM read_avro('/path/to/dir/*.avro') +//! ``` + +use std::sync::Arc; + +use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; +use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion_common::Result; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource_avro::file_format::AvroFormat; + +use tokio::runtime::Handle; +use tokio::task::block_in_place; + +use crate::extract_path; + +/// Table function that reads Avro files. +#[derive(Debug, Default)] +pub struct ReadAvroFunc; + +impl TableFunctionImpl for ReadAvroFunc { + fn call_with_args(&self, args: TableFunctionArgs) -> Result> { + let exprs = args.exprs(); + if exprs.len() != 1 { + return datafusion_common::plan_err!( + "read_avro requires exactly 1 argument (path), got {}", + exprs.len() + ); + } + + let path = extract_path(&exprs[0], "read_avro")?; + let session = args.session(); + + let table_path = ListingTableUrl::parse(&path)?; + + let listing_options = ListingOptions::new(Arc::new(AvroFormat)) + .with_file_extension(".avro") + .with_session_config_options(session.config()); + + let schema = block_in_place(|| { + Handle::current() + .block_on(listing_options.infer_schema(session, &table_path)) + })?; + + let config = ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(schema); + + let table = ListingTable::try_new(config)?; + Ok(Arc::new(table)) + } +} diff --git a/datafusion/functions-table/src/read_csv.rs b/datafusion/functions-table/src/read_csv.rs new file mode 100644 index 0000000000000..3a7b954d34bd2 --- /dev/null +++ b/datafusion/functions-table/src/read_csv.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `read_csv` table function: reads CSV files as a table source. +//! +//! ```sql +//! SELECT * FROM read_csv('/path/to/file.csv') +//! SELECT * FROM read_csv('/path/to/dir/*.csv') +//! ``` + +use std::sync::Arc; + +use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; +use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion_common::Result; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource_csv::file_format::CsvFormat; + +use tokio::runtime::Handle; +use tokio::task::block_in_place; + +use crate::extract_path; + +/// Table function that reads CSV files. +#[derive(Debug, Default)] +pub struct ReadCsvFunc; + +impl TableFunctionImpl for ReadCsvFunc { + fn call_with_args(&self, args: TableFunctionArgs) -> Result> { + let exprs = args.exprs(); + if exprs.len() != 1 { + return datafusion_common::plan_err!( + "read_csv requires exactly 1 argument (path), got {}", + exprs.len() + ); + } + + let path = extract_path(&exprs[0], "read_csv")?; + let session = args.session(); + + let table_path = ListingTableUrl::parse(&path)?; + + let csv_format = CsvFormat::default() + .with_options(session.default_table_options().csv); + + let listing_options = ListingOptions::new(Arc::new(csv_format)) + .with_file_extension(".csv") + .with_session_config_options(session.config()); + + let schema = block_in_place(|| { + Handle::current() + .block_on(listing_options.infer_schema(session, &table_path)) + })?; + + let config = ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(schema); + + let table = ListingTable::try_new(config)?; + Ok(Arc::new(table)) + } +} diff --git a/datafusion/functions-table/src/read_json.rs b/datafusion/functions-table/src/read_json.rs new file mode 100644 index 0000000000000..5515ba9c2f9ad --- /dev/null +++ b/datafusion/functions-table/src/read_json.rs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `read_json` table function: reads JSON files as a table source. +//! +//! ```sql +//! SELECT * FROM read_json('/path/to/file.json') +//! SELECT * FROM read_json('/path/to/dir/*.json') +//! ``` + +use std::sync::Arc; + +use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; +use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion_common::Result; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource_json::file_format::JsonFormat; + +use tokio::runtime::Handle; +use tokio::task::block_in_place; + +use crate::extract_path; + +/// Table function that reads JSON files. +#[derive(Debug, Default)] +pub struct ReadJsonFunc; + +impl TableFunctionImpl for ReadJsonFunc { + fn call_with_args(&self, args: TableFunctionArgs) -> Result> { + let exprs = args.exprs(); + if exprs.len() != 1 { + return datafusion_common::plan_err!( + "read_json requires exactly 1 argument (path), got {}", + exprs.len() + ); + } + + let path = extract_path(&exprs[0], "read_json")?; + let session = args.session(); + + let table_path = ListingTableUrl::parse(&path)?; + + let json_format = JsonFormat::default() + .with_options(session.default_table_options().json); + + let listing_options = ListingOptions::new(Arc::new(json_format)) + .with_file_extension(".json") + .with_session_config_options(session.config()); + + let schema = block_in_place(|| { + Handle::current() + .block_on(listing_options.infer_schema(session, &table_path)) + })?; + + let config = ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(schema); + + let table = ListingTable::try_new(config)?; + Ok(Arc::new(table)) + } +} diff --git a/datafusion/functions-table/src/read_parquet.rs b/datafusion/functions-table/src/read_parquet.rs new file mode 100644 index 0000000000000..a97cfbf79dff5 --- /dev/null +++ b/datafusion/functions-table/src/read_parquet.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `read_parquet` table function: reads Parquet files as a table source. +//! +//! ```sql +//! SELECT * FROM read_parquet('/path/to/file.parquet') +//! SELECT * FROM read_parquet('/path/to/dir/*.parquet') +//! SELECT count(*) FROM read_parquet('s3://bucket/prefix/*.parquet') +//! ``` + +use std::sync::Arc; + +use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; +use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion_common::{plan_err, Result}; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource_parquet::file_format::ParquetFormat; + +use crate::extract_path; + +use tokio::runtime::Handle; +use tokio::task::block_in_place; + +/// Table function that reads Parquet files. +#[derive(Debug, Default)] +pub struct ReadParquetFunc; + +impl TableFunctionImpl for ReadParquetFunc { + fn call_with_args(&self, args: TableFunctionArgs) -> Result> { + let exprs = args.exprs(); + if exprs.len() != 1 { + return plan_err!( + "read_parquet requires exactly 1 argument (path), got {}", + exprs.len() + ); + } + + let path = extract_path(&exprs[0], "read_parquet")?; + let session = args.session(); + + let table_path = ListingTableUrl::parse(&path)?; + + let listing_options = ListingOptions::new(Arc::new(ParquetFormat::new())) + .with_file_extension(".parquet") + .with_session_config_options(session.config()); + + let schema = block_in_place(|| { + Handle::current() + .block_on(listing_options.infer_schema(session, &table_path)) + })?; + + let config = ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(schema); + + let table = ListingTable::try_new(config)?; + Ok(Arc::new(table)) + } +} From e409f11d4b5d7bfdd40a5ef08d810e25ce10df48 Mon Sep 17 00:00:00 2001 From: Christian McArthur Date: Sat, 4 Apr 2026 14:01:05 -0400 Subject: [PATCH 2/3] feat: add read_parquet, read_csv, read_json, read_avro SQL table functions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds four inline SQL table functions that create ListingTable-backed scans, enabling DuckDB-style ad-hoc file querying: SELECT * FROM read_parquet('/path/to/*.parquet') SELECT * FROM read_csv('/data/file.csv') SELECT * FROM read_json('/data/file.json') SELECT * FROM read_avro('/data/file.avro') Each function is a thin wrapper (~60 lines) over ListingTable: 1. Extracts path from SQL literal argument 2. Constructs ListingOptions with the format's FileFormat 3. Infers schema via blocking bridge (std::thread::scope) 4. Returns ListingTable as TableProvider Since the SQL planner wraps UDTF output as LogicalPlan::TableScan, all optimizer rules apply automatically — filter pushdown, projection pushdown, and partition pruning work out of the box. Feature gating: - read_parquet: requires `parquet` feature (default on) - read_avro: requires `avro` feature (default off) - read_csv/read_json: always available (no heavy deps) Async bridge: uses std::thread::scope + Handle::block_on instead of block_in_place, so it works on both multi-thread and current-thread Tokio runtimes. Tested with single-threaded runtime. 16 tests covering: basic read, filtered read, projection, aggregation, glob multi-file, error paths, filter/projection pushdown verification, and single-threaded runtime safety. Closes #3773 --- .../core/tests/sql/read_table_functions.rs | 62 +++++++++++++++++++ datafusion/functions-table/Cargo.toml | 3 +- datafusion/functions-table/src/lib.rs | 30 ++++++++- datafusion/functions-table/src/read_avro.rs | 14 ++--- datafusion/functions-table/src/read_csv.rs | 14 ++--- datafusion/functions-table/src/read_json.rs | 14 ++--- .../functions-table/src/read_parquet.rs | 10 +-- 7 files changed, 107 insertions(+), 40 deletions(-) diff --git a/datafusion/core/tests/sql/read_table_functions.rs b/datafusion/core/tests/sql/read_table_functions.rs index 2b578c2315c8b..c8f75c44eb992 100644 --- a/datafusion/core/tests/sql/read_table_functions.rs +++ b/datafusion/core/tests/sql/read_table_functions.rs @@ -418,3 +418,65 @@ async fn read_json_with_filter() -> Result<()> { Ok(()) } + +// --------------------------------------------------------------------------- +// Error-path tests +// --------------------------------------------------------------------------- + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_csv_no_args_error() -> Result<()> { + let ctx = SessionContext::new(); + let result = ctx.sql("SELECT * FROM read_csv()").await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert_contains!(err, "read_csv requires exactly 1 argument"); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_json_no_args_error() -> Result<()> { + let ctx = SessionContext::new(); + let result = ctx.sql("SELECT * FROM read_json()").await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert_contains!(err, "read_json requires exactly 1 argument"); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_parquet_wrong_arg_type() -> Result<()> { + let ctx = SessionContext::new(); + let result = ctx.sql("SELECT * FROM read_parquet(42)").await; + assert!(result.is_err()); + let err = result.unwrap_err().to_string(); + assert_contains!(err, "read_parquet requires a string literal path argument"); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Single-threaded runtime tests — verify no panic on current_thread runtime +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn read_parquet_single_threaded_runtime() -> Result<()> { + let tmp = TempDir::new()?; + let path = write_test_parquet(&tmp).await?; + + let ctx = SessionContext::new(); + let results = ctx + .sql(&format!("SELECT count(*) AS cnt FROM read_parquet('{path}')")) + .await? + .collect() + .await?; + + #[cfg_attr(any(), rustfmt::skip)] + assert_batches_eq!(&[ + "+-----+", + "| cnt |", + "+-----+", + "| 5 |", + "+-----+", + ], &results); + + Ok(()) +} diff --git a/datafusion/functions-table/Cargo.toml b/datafusion/functions-table/Cargo.toml index d0aac7e09b43d..dc5979a159ab2 100644 --- a/datafusion/functions-table/Cargo.toml +++ b/datafusion/functions-table/Cargo.toml @@ -53,13 +53,14 @@ datafusion-catalog-listing = { workspace = true } datafusion-common = { workspace = true } datafusion-datasource = { workspace = true } datafusion-datasource-avro = { workspace = true, optional = true } +# CSV and JSON are always available — no heavy optional dependencies datafusion-datasource-csv = { workspace = true } datafusion-datasource-json = { workspace = true } datafusion-datasource-parquet = { workspace = true, optional = true } datafusion-expr = { workspace = true } datafusion-physical-plan = { workspace = true } parking_lot = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio = { workspace = true, features = ["rt"] } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } diff --git a/datafusion/functions-table/src/lib.rs b/datafusion/functions-table/src/lib.rs index 42d29c99029bf..33571042273ce 100644 --- a/datafusion/functions-table/src/lib.rs +++ b/datafusion/functions-table/src/lib.rs @@ -28,13 +28,17 @@ pub mod generate_series; #[cfg(feature = "avro")] pub mod read_avro; +// CSV and JSON are always available — no heavy optional dependencies pub mod read_csv; pub mod read_json; #[cfg(feature = "parquet")] pub mod read_parquet; -use datafusion_catalog::TableFunction; +use arrow::datatypes::SchemaRef; +use datafusion_catalog::{Session, TableFunction}; +use datafusion_catalog_listing::ListingOptions; use datafusion_common::{plan_err, Result}; +use datafusion_datasource::ListingTableUrl; use datafusion_expr::Expr; use std::sync::Arc; @@ -53,6 +57,30 @@ pub(crate) fn extract_path(expr: &Expr, func_name: &str) -> Result { } } +/// Bridge async `infer_schema` into the sync `call_with_args` context. +/// +/// Spawns a scoped OS thread so the Tokio executor thread is never blocked. +/// Inside the spawned thread, [`tokio::runtime::Handle::block_on`] drives +/// the future to completion. This works on **both** multi-thread and +/// current-thread Tokio runtimes (unlike `block_in_place`, which panics on +/// single-threaded runtimes). +pub(crate) fn infer_schema_blocking( + listing_options: &ListingOptions, + session: &dyn Session, + table_path: &ListingTableUrl, +) -> Result { + let handle = tokio::runtime::Handle::current(); + std::thread::scope(|scope| { + scope + .spawn(|| { + handle + .block_on(listing_options.infer_schema(session, table_path)) + }) + .join() + .expect("infer_schema thread panicked") + }) +} + /// Returns all default table functions pub fn all_default_table_functions() -> Vec> { #[cfg(any(feature = "parquet", feature = "avro"))] diff --git a/datafusion/functions-table/src/read_avro.rs b/datafusion/functions-table/src/read_avro.rs index 1a7cf4486c780..1d5def7e51df0 100644 --- a/datafusion/functions-table/src/read_avro.rs +++ b/datafusion/functions-table/src/read_avro.rs @@ -26,14 +26,11 @@ use std::sync::Arc; use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; -use datafusion_common::Result; +use datafusion_common::{plan_err, Result}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource_avro::file_format::AvroFormat; -use tokio::runtime::Handle; -use tokio::task::block_in_place; - -use crate::extract_path; +use crate::{extract_path, infer_schema_blocking}; /// Table function that reads Avro files. #[derive(Debug, Default)] @@ -43,7 +40,7 @@ impl TableFunctionImpl for ReadAvroFunc { fn call_with_args(&self, args: TableFunctionArgs) -> Result> { let exprs = args.exprs(); if exprs.len() != 1 { - return datafusion_common::plan_err!( + return plan_err!( "read_avro requires exactly 1 argument (path), got {}", exprs.len() ); @@ -58,10 +55,7 @@ impl TableFunctionImpl for ReadAvroFunc { .with_file_extension(".avro") .with_session_config_options(session.config()); - let schema = block_in_place(|| { - Handle::current() - .block_on(listing_options.infer_schema(session, &table_path)) - })?; + let schema = infer_schema_blocking(&listing_options, session, &table_path)?; let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) diff --git a/datafusion/functions-table/src/read_csv.rs b/datafusion/functions-table/src/read_csv.rs index 3a7b954d34bd2..994d7c1648023 100644 --- a/datafusion/functions-table/src/read_csv.rs +++ b/datafusion/functions-table/src/read_csv.rs @@ -26,14 +26,11 @@ use std::sync::Arc; use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; -use datafusion_common::Result; +use datafusion_common::{plan_err, Result}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource_csv::file_format::CsvFormat; -use tokio::runtime::Handle; -use tokio::task::block_in_place; - -use crate::extract_path; +use crate::{extract_path, infer_schema_blocking}; /// Table function that reads CSV files. #[derive(Debug, Default)] @@ -43,7 +40,7 @@ impl TableFunctionImpl for ReadCsvFunc { fn call_with_args(&self, args: TableFunctionArgs) -> Result> { let exprs = args.exprs(); if exprs.len() != 1 { - return datafusion_common::plan_err!( + return plan_err!( "read_csv requires exactly 1 argument (path), got {}", exprs.len() ); @@ -61,10 +58,7 @@ impl TableFunctionImpl for ReadCsvFunc { .with_file_extension(".csv") .with_session_config_options(session.config()); - let schema = block_in_place(|| { - Handle::current() - .block_on(listing_options.infer_schema(session, &table_path)) - })?; + let schema = infer_schema_blocking(&listing_options, session, &table_path)?; let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) diff --git a/datafusion/functions-table/src/read_json.rs b/datafusion/functions-table/src/read_json.rs index 5515ba9c2f9ad..7d60af80c3789 100644 --- a/datafusion/functions-table/src/read_json.rs +++ b/datafusion/functions-table/src/read_json.rs @@ -26,14 +26,11 @@ use std::sync::Arc; use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; -use datafusion_common::Result; +use datafusion_common::{plan_err, Result}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource_json::file_format::JsonFormat; -use tokio::runtime::Handle; -use tokio::task::block_in_place; - -use crate::extract_path; +use crate::{extract_path, infer_schema_blocking}; /// Table function that reads JSON files. #[derive(Debug, Default)] @@ -43,7 +40,7 @@ impl TableFunctionImpl for ReadJsonFunc { fn call_with_args(&self, args: TableFunctionArgs) -> Result> { let exprs = args.exprs(); if exprs.len() != 1 { - return datafusion_common::plan_err!( + return plan_err!( "read_json requires exactly 1 argument (path), got {}", exprs.len() ); @@ -61,10 +58,7 @@ impl TableFunctionImpl for ReadJsonFunc { .with_file_extension(".json") .with_session_config_options(session.config()); - let schema = block_in_place(|| { - Handle::current() - .block_on(listing_options.infer_schema(session, &table_path)) - })?; + let schema = infer_schema_blocking(&listing_options, session, &table_path)?; let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) diff --git a/datafusion/functions-table/src/read_parquet.rs b/datafusion/functions-table/src/read_parquet.rs index a97cfbf79dff5..ef9cfd32f1e18 100644 --- a/datafusion/functions-table/src/read_parquet.rs +++ b/datafusion/functions-table/src/read_parquet.rs @@ -31,10 +31,7 @@ use datafusion_common::{plan_err, Result}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource_parquet::file_format::ParquetFormat; -use crate::extract_path; - -use tokio::runtime::Handle; -use tokio::task::block_in_place; +use crate::{extract_path, infer_schema_blocking}; /// Table function that reads Parquet files. #[derive(Debug, Default)] @@ -59,10 +56,7 @@ impl TableFunctionImpl for ReadParquetFunc { .with_file_extension(".parquet") .with_session_config_options(session.config()); - let schema = block_in_place(|| { - Handle::current() - .block_on(listing_options.infer_schema(session, &table_path)) - })?; + let schema = infer_schema_blocking(&listing_options, session, &table_path)?; let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) From 28742766bb4a09e6c8e20ea566ddafab0a0069f1 Mon Sep 17 00:00:00 2001 From: Christian McArthur Date: Wed, 8 Apr 2026 20:30:34 -0400 Subject: [PATCH 3/3] fix: resolve CI failures on #21367 - cargo fmt: reformat read_table_functions.rs, read_parquet.rs, read_csv.rs, read_json.rs, read_avro.rs, and functions-table/lib.rs per current rustfmt rules (line wrapping on long paths and format! calls). - taplo: reformat datafusion/core/Cargo.toml per the repo's taplo.toml config. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/core/Cargo.toml | 7 ++- .../core/tests/sql/read_table_functions.rs | 51 +++++++++++-------- datafusion/functions-table/src/lib.rs | 13 ++--- datafusion/functions-table/src/read_avro.rs | 2 +- datafusion/functions-table/src/read_csv.rs | 6 +-- datafusion/functions-table/src/read_json.rs | 6 +-- .../functions-table/src/read_parquet.rs | 2 +- 7 files changed, 48 insertions(+), 39 deletions(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 419802e1cc7b0..5364dd873ecb7 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -72,7 +72,12 @@ encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] math_expressions = ["datafusion-functions/math_expressions"] -parquet = ["datafusion-common/parquet", "dep:parquet", "datafusion-datasource-parquet", "datafusion-functions-table/parquet"] +parquet = [ + "datafusion-common/parquet", + "dep:parquet", + "datafusion-datasource-parquet", + "datafusion-functions-table/parquet", +] parquet_encryption = [ "parquet", "parquet/encryption", diff --git a/datafusion/core/tests/sql/read_table_functions.rs b/datafusion/core/tests/sql/read_table_functions.rs index c8f75c44eb992..59812542ed5d1 100644 --- a/datafusion/core/tests/sql/read_table_functions.rs +++ b/datafusion/core/tests/sql/read_table_functions.rs @@ -37,7 +37,12 @@ use tempfile::TempDir; /// Create a temporary parquet file with known data and return the path. async fn write_test_parquet(dir: &TempDir) -> Result { let ctx = SessionContext::new(); - let path = dir.path().join("test.parquet").to_str().unwrap().to_string(); + let path = dir + .path() + .join("test.parquet") + .to_str() + .unwrap() + .to_string(); let batch = RecordBatch::try_new( Arc::new(Schema::new(vec![ @@ -155,7 +160,9 @@ async fn read_parquet_count() -> Result<()> { let ctx = SessionContext::new(); let results = ctx - .sql(&format!("SELECT count(*) AS cnt FROM read_parquet('{path}')")) + .sql(&format!( + "SELECT count(*) AS cnt FROM read_parquet('{path}')" + )) .await? .collect() .await?; @@ -186,8 +193,18 @@ async fn read_parquet_glob() -> Result<()> { vec![Arc::new(Int32Array::from(vec![3, 4]))], )?; - let path1 = tmp.path().join("part1.parquet").to_str().unwrap().to_string(); - let path2 = tmp.path().join("part2.parquet").to_str().unwrap().to_string(); + let path1 = tmp + .path() + .join("part1.parquet") + .to_str() + .unwrap() + .to_string(); + let path2 = tmp + .path() + .join("part2.parquet") + .to_str() + .unwrap() + .to_string(); ctx.read_batch(batch1)? .write_parquet( @@ -262,9 +279,7 @@ async fn read_parquet_projection_pushdown() -> Result<()> { let ctx = SessionContext::new(); let df = ctx - .sql(&format!( - "EXPLAIN SELECT name FROM read_parquet('{path}')" - )) + .sql(&format!("EXPLAIN SELECT name FROM read_parquet('{path}')")) .await?; let results = df.collect().await?; @@ -284,16 +299,11 @@ async fn read_csv_basic() -> Result<()> { let tmp = TempDir::new()?; let path = tmp.path().join("test.csv").to_str().unwrap().to_string(); - std::fs::write( - &path, - "id,name,value\n1,a,1.0\n2,b,2.0\n3,c,3.0\n", - )?; + std::fs::write(&path, "id,name,value\n1,a,1.0\n2,b,2.0\n3,c,3.0\n")?; let ctx = SessionContext::new(); let results = ctx - .sql(&format!( - "SELECT * FROM read_csv('{path}') ORDER BY id" - )) + .sql(&format!("SELECT * FROM read_csv('{path}') ORDER BY id")) .await? .collect() .await?; @@ -317,10 +327,7 @@ async fn read_csv_with_filter() -> Result<()> { let tmp = TempDir::new()?; let path = tmp.path().join("test.csv").to_str().unwrap().to_string(); - std::fs::write( - &path, - "id,name,value\n1,a,1.0\n2,b,2.0\n3,c,3.0\n", - )?; + std::fs::write(&path, "id,name,value\n1,a,1.0\n2,b,2.0\n3,c,3.0\n")?; let ctx = SessionContext::new(); let results = ctx @@ -363,9 +370,7 @@ async fn read_json_basic() -> Result<()> { let ctx = SessionContext::new(); let results = ctx - .sql(&format!( - "SELECT * FROM read_json('{path}') ORDER BY id" - )) + .sql(&format!("SELECT * FROM read_json('{path}') ORDER BY id")) .await? .collect() .await?; @@ -464,7 +469,9 @@ async fn read_parquet_single_threaded_runtime() -> Result<()> { let ctx = SessionContext::new(); let results = ctx - .sql(&format!("SELECT count(*) AS cnt FROM read_parquet('{path}')")) + .sql(&format!( + "SELECT count(*) AS cnt FROM read_parquet('{path}')" + )) .await? .collect() .await?; diff --git a/datafusion/functions-table/src/lib.rs b/datafusion/functions-table/src/lib.rs index 33571042273ce..6c93f8e28023c 100644 --- a/datafusion/functions-table/src/lib.rs +++ b/datafusion/functions-table/src/lib.rs @@ -37,7 +37,7 @@ pub mod read_parquet; use arrow::datatypes::SchemaRef; use datafusion_catalog::{Session, TableFunction}; use datafusion_catalog_listing::ListingOptions; -use datafusion_common::{plan_err, Result}; +use datafusion_common::{Result, plan_err}; use datafusion_datasource::ListingTableUrl; use datafusion_expr::Expr; use std::sync::Arc; @@ -51,9 +51,9 @@ pub(crate) fn extract_path(expr: &Expr, func_name: &str) -> Result { "{func_name} requires a string literal path argument, got {scalar:?}" ), }, - _ => plan_err!( - "{func_name} requires a string literal path argument, got {expr:?}" - ), + _ => { + plan_err!("{func_name} requires a string literal path argument, got {expr:?}") + } } } @@ -72,10 +72,7 @@ pub(crate) fn infer_schema_blocking( let handle = tokio::runtime::Handle::current(); std::thread::scope(|scope| { scope - .spawn(|| { - handle - .block_on(listing_options.infer_schema(session, table_path)) - }) + .spawn(|| handle.block_on(listing_options.infer_schema(session, table_path))) .join() .expect("infer_schema thread panicked") }) diff --git a/datafusion/functions-table/src/read_avro.rs b/datafusion/functions-table/src/read_avro.rs index 1d5def7e51df0..61dabc7ed1971 100644 --- a/datafusion/functions-table/src/read_avro.rs +++ b/datafusion/functions-table/src/read_avro.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; -use datafusion_common::{plan_err, Result}; +use datafusion_common::{Result, plan_err}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource_avro::file_format::AvroFormat; diff --git a/datafusion/functions-table/src/read_csv.rs b/datafusion/functions-table/src/read_csv.rs index 994d7c1648023..9733965aa25bc 100644 --- a/datafusion/functions-table/src/read_csv.rs +++ b/datafusion/functions-table/src/read_csv.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; -use datafusion_common::{plan_err, Result}; +use datafusion_common::{Result, plan_err}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource_csv::file_format::CsvFormat; @@ -51,8 +51,8 @@ impl TableFunctionImpl for ReadCsvFunc { let table_path = ListingTableUrl::parse(&path)?; - let csv_format = CsvFormat::default() - .with_options(session.default_table_options().csv); + let csv_format = + CsvFormat::default().with_options(session.default_table_options().csv); let listing_options = ListingOptions::new(Arc::new(csv_format)) .with_file_extension(".csv") diff --git a/datafusion/functions-table/src/read_json.rs b/datafusion/functions-table/src/read_json.rs index 7d60af80c3789..3fc10984cd047 100644 --- a/datafusion/functions-table/src/read_json.rs +++ b/datafusion/functions-table/src/read_json.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; -use datafusion_common::{plan_err, Result}; +use datafusion_common::{Result, plan_err}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource_json::file_format::JsonFormat; @@ -51,8 +51,8 @@ impl TableFunctionImpl for ReadJsonFunc { let table_path = ListingTableUrl::parse(&path)?; - let json_format = JsonFormat::default() - .with_options(session.default_table_options().json); + let json_format = + JsonFormat::default().with_options(session.default_table_options().json); let listing_options = ListingOptions::new(Arc::new(json_format)) .with_file_extension(".json") diff --git a/datafusion/functions-table/src/read_parquet.rs b/datafusion/functions-table/src/read_parquet.rs index ef9cfd32f1e18..435c54652ea71 100644 --- a/datafusion/functions-table/src/read_parquet.rs +++ b/datafusion/functions-table/src/read_parquet.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use datafusion_catalog::{TableFunctionArgs, TableFunctionImpl, TableProvider}; use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig}; -use datafusion_common::{plan_err, Result}; +use datafusion_common::{Result, plan_err}; use datafusion_datasource::ListingTableUrl; use datafusion_datasource_parquet::file_format::ParquetFormat;