From 00a2af67787efbe0fdf36dc3fbe78c988109b06d Mon Sep 17 00:00:00 2001 From: Whis Liao Date: Wed, 4 Dec 2024 15:56:35 +0800 Subject: [PATCH] add ODBC usage to README & simplify dependency management (#189) * docs: add ODBC usage to README * chore: simplify dependency management --- Cargo.toml | 138 ++++++++---------- README.md | 14 +- examples/duckdb.rs | 9 +- examples/mysql.rs | 8 +- examples/odbc_sqlite.rs | 3 +- examples/postgres.rs | 16 +- examples/sqlite.rs | 2 +- src/duckdb.rs | 2 +- src/duckdb/creator.rs | 6 +- src/duckdb/federation.rs | 2 +- src/duckdb/write.rs | 2 +- src/flight.rs | 4 +- src/flight/codec.rs | 4 +- src/flight/exec.rs | 26 ++-- src/lib.rs | 2 +- src/mysql.rs | 4 +- src/mysql/write.rs | 2 +- src/odbc.rs | 2 +- src/postgres.rs | 8 +- src/postgres/write.rs | 2 +- src/sql/arrow_sql_gen/arrow.rs | 2 +- src/sql/arrow_sql_gen/mod.rs | 4 +- src/sql/arrow_sql_gen/mysql.rs | 12 +- src/sql/arrow_sql_gen/postgres.rs | 24 +-- src/sql/arrow_sql_gen/postgres/builder.rs | 4 +- src/sql/arrow_sql_gen/postgres/schema.rs | 4 +- src/sql/arrow_sql_gen/sqlite.rs | 30 ++-- src/sql/arrow_sql_gen/statement.rs | 10 +- src/sql/db_connection_pool/dbconnection.rs | 2 +- .../dbconnection/duckdbconn.rs | 2 +- .../dbconnection/mysqlconn.rs | 2 +- .../dbconnection/odbcconn.rs | 10 +- .../dbconnection/postgresconn.rs | 6 +- .../dbconnection/sqliteconn.rs | 2 +- src/sql/db_connection_pool/odbcpool.rs | 2 +- src/sql/sql_provider_datafusion/mod.rs | 2 +- src/sqlite.rs | 12 +- src/sqlite/federation.rs | 2 +- src/sqlite/write.rs | 8 +- src/util/constraints.rs | 7 +- src/util/on_conflict.rs | 4 +- src/util/test.rs | 2 +- tests/arrow_record_batch_gen/mod.rs | 6 +- tests/duckdb/mod.rs | 4 +- tests/flight/mod.rs | 4 +- tests/mysql/mod.rs | 6 +- tests/postgres/mod.rs | 12 +- tests/postgres/schema.rs | 2 +- tests/sqlite/mod.rs | 4 +- 49 files changed, 224 insertions(+), 223 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7e025e1d..6366f6f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,29 +8,24 @@ license = "Apache-2.0" description = "Extend the capabilities of DataFusion to support additional data sources via implementations of the `TableProvider` trait." [dependencies] -arrow = "53" -arrow-array = { version = "53", optional = true } -arrow-cast = { version = "53", optional = true } arrow-flight = { version = "53", optional = true, features = [ "flight-sql-experimental", "tls", ] } -arrow-schema = { version = "53", optional = true, features = ["serde"] } -arrow-json = "53" -async-stream = { version = "0.3.5", optional = true } -async-trait = "0.1.80" -num-bigint = "0.4.4" -bigdecimal = "0.4.5" +arrow-odbc = { version = "14.0", optional = true } +async-stream = { version = "0.3.6", optional = true } +async-trait = "0.1" +bb8 = { version = "0.8", optional = true } +bb8-postgres = { version = "0.8", optional = true } +bigdecimal = "0.4.6" byteorder = "1.5.0" chrono = "0.4.38" -datafusion = "43.0.0" -datafusion-expr = { version = "43.0.0", optional = true } -datafusion-physical-expr = { version = "43.0.0", optional = true } -datafusion-physical-plan = { version = "43.0.0", optional = true } -datafusion-proto = { version = "43.0.0", optional = true } +dashmap = "6.1.0" +datafusion = { version = "43", default-features = false } datafusion-federation = { version = "0.3.1", features = [ "sql", ], optional = true } +datafusion-proto = { version = "43", optional = true } duckdb = { version = "1.1.1", features = [ "bundled", "r2d2", @@ -38,16 +33,25 @@ duckdb = { version = "1.1.1", features = [ "vtab-arrow", "appender-arrow", ], optional = true } +dyn-clone = { version = "1.0", optional = true } fallible-iterator = "0.3.0" -futures = "0.3.30" -mysql_async = { version = "0.34.1", features = [ +fundu = "2.0.1" +futures = "0.3" +geo-types = "0.7" +itertools = "0.13.0" +mysql_async = { version = "0.34", features = [ "native-tls-tls", "chrono", ], optional = true } -prost = { version = "0.13.2", optional = true } +native-tls = { version = "0.2.12", optional = true } +num-bigint = "0.4" +odbc-api = { version = "10.0.0", optional = true } +pem = { version = "3.0.4", optional = true } +postgres-native-tls = { version = "0.5.0", optional = true } +prost = { version = "0.13", optional = true } r2d2 = { version = "0.8.10", optional = true } -rusqlite = { version = "0.31.0", optional = true } -sea-query = { version = "0.32.0-rc.1", features = [ +rusqlite = { version = "0.32.1", optional = true } +sea-query = { version = "0.32.0", features = [ "backend-sqlite", "backend-postgres", "postgres-array", @@ -57,69 +61,42 @@ sea-query = { version = "0.32.0-rc.1", features = [ "with-chrono", ] } secrecy = "0.8.0" -serde = { version = "1.0.209", optional = true } -serde_json = "1.0.124" -snafu = "0.8.3" +serde = { version = "1.0", optional = true } +serde_json = "1.0" +sha2 = "0.10.8" +snafu = "0.8.5" time = "0.3.36" -tokio = { version = "1.38.0", features = ["macros", "fs"] } -tokio-util = "0.7.12" -tokio-postgres = { version = "0.7.10", features = [ +tokio = { version = "1.41", features = ["macros", "fs"] } +tokio-postgres = { version = "0.7.12", features = [ "with-chrono-0_4", "with-uuid-1", "with-serde_json-1", "with-geo-types-0_7", ], optional = true } -tracing = "0.1.40" -uuid = { version = "1.9.1", optional = true } -postgres-native-tls = { version = "0.5.0", optional = true } -bb8 = { version = "0.8", optional = true } -bb8-postgres = { version = "0.8", optional = true } -native-tls = { version = "0.2.11", optional = true } -trust-dns-resolver = "0.23.2" -url = "2.5.1" -pem = { version = "3.0.4", optional = true } -tokio-rusqlite = { version = "0.5.1", optional = true } +tokio-rusqlite = { version = "0.6.0", optional = true } tonic = { version = "0.12", optional = true, features = [ "tls-native-roots", "tls-webpki-roots", ] } -itertools = "0.13.0" -dyn-clone = { version = "1.0.17", optional = true } -geo-types = "0.7.13" -fundu = "2.0.1" -dashmap = "6.1.0" -odbc-api = { version = "9.0.0", optional = true } -arrow-odbc = { version = "13.0.0", optional = true } -sha2 = "0.10.8" +tracing = "0.1.40" +trust-dns-resolver = "0.23.2" +url = "2.5.4" +uuid = { version = "1.11.0", optional = true } [dev-dependencies] -anyhow = "1.0.86" -bollard = "0.17.1" -rand = "0.8.5" -reqwest = "0.12.5" -secrecy = "0.8.0" -tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } -test-log = { version = "0.2.16", features = ["trace"] } -rstest = "0.23.0" +anyhow = "1.0" +bollard = "0.18.1" geozero = { version = "0.14.0", features = ["with-wkb"] } -tokio-stream = { version = "0.1.15", features = ["net"] } -arrow-schema = "53.1.0" +insta = { version = "1.41.1", features = ["filters"] } prost = { version = "0.13" } -insta = { version = "1.40.0", features = ["filters"] } +rand = "0.8.5" +reqwest = "0.12.9" +rstest = "0.23.0" +test-log = { version = "0.2.16", features = ["trace"] } +tokio-stream = { version = "0.1.16", features = ["net"] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } [features] -mysql = ["dep:mysql_async", "dep:async-stream"] -postgres = [ - "dep:tokio-postgres", - "dep:uuid", - "dep:postgres-native-tls", - "dep:bb8", - "dep:bb8-postgres", - "dep:native-tls", - "dep:pem", - "dep:async-stream", -] -sqlite = ["dep:rusqlite", "dep:tokio-rusqlite"] duckdb = [ "dep:duckdb", "dep:r2d2", @@ -127,22 +104,29 @@ duckdb = [ "dep:dyn-clone", "dep:async-stream", ] +duckdb-federation = ["duckdb", "federation"] +federation = ["dep:datafusion-federation"] flight = [ - "dep:arrow-array", - "dep:arrow-cast", "dep:arrow-flight", - "dep:arrow-schema", - "dep:datafusion-expr", - "dep:datafusion-physical-expr", - "dep:datafusion-physical-plan", + "datafusion/serde", "dep:datafusion-proto", "dep:serde", "dep:tonic", ] -odbc = ["dep:odbc-api", "dep:arrow-odbc", "dep:async-stream", "dep:dyn-clone"] -federation = ["dep:datafusion-federation"] -duckdb-federation = ["duckdb", "federation"] -sqlite-federation = ["sqlite", "federation"] -postgres-federation = ["postgres", "federation"] +mysql = ["dep:mysql_async", "dep:async-stream"] mysql-federation = ["mysql", "federation"] +odbc = ["dep:odbc-api", "dep:arrow-odbc", "dep:async-stream", "dep:dyn-clone"] odbc-federation = ["odbc", "federation"] +postgres = [ + "dep:tokio-postgres", + "dep:uuid", + "dep:postgres-native-tls", + "dep:bb8", + "dep:bb8-postgres", + "dep:native-tls", + "dep:pem", + "dep:async-stream", +] +postgres-federation = ["postgres", "federation"] +sqlite = ["dep:rusqlite", "dep:tokio-rusqlite"] +sqlite-federation = ["sqlite", "federation"] diff --git a/README.md b/README.md index 8309d932..81e31d34 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ let ctx = SessionContext::with_state(state); - SQLite - DuckDB - Flight SQL +- ODBC ## Examples @@ -100,8 +101,19 @@ cargo run --example mysql --features mysql ```bash brew install roapi # or -#cargo install --locked --git https://github.com/roapi/roapi --branch main --bins roapi +# cargo install --locked --git https://github.com/roapi/roapi --branch main --bins roapi roapi -t taxi=https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet & cargo run --example flight-sql --features flight ``` + +### ODBC + +```bash +apt-get install unixodbc-dev libsqliteodbc +# or +# brew install unixodbc & brew install sqliteodbc +# If you use ARM Mac, please see https://github.com/pacman82/odbc-api#os-x-arm--mac-m1 + +cargo run --example odbc_sqlite --features odbc +``` diff --git a/examples/duckdb.rs b/examples/duckdb.rs index af821171..dce0b6f3 100644 --- a/examples/duckdb.rs +++ b/examples/duckdb.rs @@ -1,12 +1,11 @@ use std::sync::Arc; use datafusion::prelude::SessionContext; +use datafusion::sql::TableReference; use datafusion_table_providers::{ - common::DatabaseCatalogProvider, + common::DatabaseCatalogProvider, duckdb::DuckDBTableFactory, sql::db_connection_pool::duckdbpool::DuckDbConnectionPool, - duckdb::DuckDBTableFactory, }; -use datafusion::sql::TableReference; use duckdb::AccessMode; /// This example demonstrates how to: @@ -17,7 +16,7 @@ use duckdb::AccessMode; #[tokio::main] async fn main() { // Create DuckDB connection pool - // Opening in ReadOnly mode allows multiple reader processes to access + // Opening in ReadOnly mode allows multiple reader processes to access // the database at the same time let duckdb_pool = Arc::new( DuckDbConnectionPool::new_file("examples/duckdb_example.db", &AccessMode::ReadOnly) @@ -27,7 +26,7 @@ async fn main() { // Create DuckDB table provider factory // Used to generate TableProvider instances that can read DuckDB table data let table_factory = DuckDBTableFactory::new(duckdb_pool.clone()); - + // Create database catalog provider // This allows us to access tables through catalog structure (catalog.schema.table) let catalog = DatabaseCatalogProvider::try_new(duckdb_pool).await.unwrap(); diff --git a/examples/mysql.rs b/examples/mysql.rs index b407a837..4f562481 100644 --- a/examples/mysql.rs +++ b/examples/mysql.rs @@ -3,10 +3,8 @@ use std::{collections::HashMap, sync::Arc}; use datafusion::prelude::SessionContext; use datafusion::sql::TableReference; use datafusion_table_providers::{ - common::DatabaseCatalogProvider, - sql::db_connection_pool::mysqlpool::MySQLConnectionPool, - mysql::MySQLTableFactory, - util::secrets::to_secret_map, + common::DatabaseCatalogProvider, mysql::MySQLTableFactory, + sql::db_connection_pool::mysqlpool::MySQLConnectionPool, util::secrets::to_secret_map, }; /// This example demonstrates how to: @@ -54,7 +52,7 @@ async fn main() { // Create MySQL table provider factory // Used to generate TableProvider instances that can read MySQL table data let table_factory = MySQLTableFactory::new(mysql_pool.clone()); - + // Create database catalog provider // This allows us to access tables through catalog structure (catalog.schema.table) let catalog = DatabaseCatalogProvider::try_new(mysql_pool).await.unwrap(); diff --git a/examples/odbc_sqlite.rs b/examples/odbc_sqlite.rs index 031f9a92..991293d1 100644 --- a/examples/odbc_sqlite.rs +++ b/examples/odbc_sqlite.rs @@ -17,8 +17,7 @@ async fn main() { // Create SQLite ODBC connection pool let params = to_secret_map(HashMap::from([( "connection_string".to_owned(), - "driver=SQLite3;database=examples/sqlite_example.db;" - .to_owned(), + "driver=SQLite3;database=examples/sqlite_example.db;".to_owned(), )])); let odbc_pool = Arc::new(ODBCPool::new(params).expect("unable to create SQLite ODBC connection pool")); diff --git a/examples/postgres.rs b/examples/postgres.rs index 644fbd04..d919d5b6 100644 --- a/examples/postgres.rs +++ b/examples/postgres.rs @@ -1,13 +1,11 @@ -use std::sync::Arc; -use std::collections::HashMap; use datafusion::prelude::SessionContext; use datafusion::sql::TableReference; use datafusion_table_providers::{ - common::DatabaseCatalogProvider, - sql::db_connection_pool::postgrespool::PostgresConnectionPool, - postgres::PostgresTableFactory, - util::secrets::to_secret_map, + common::DatabaseCatalogProvider, postgres::PostgresTableFactory, + sql::db_connection_pool::postgrespool::PostgresConnectionPool, util::secrets::to_secret_map, }; +use std::collections::HashMap; +use std::sync::Arc; /// This example demonstrates how to: /// 1. Create a PostgreSQL connection pool @@ -54,10 +52,12 @@ async fn main() { // Create PostgreSQL table provider factory // Used to generate TableProvider instances that can read PostgreSQL table data let table_factory = PostgresTableFactory::new(postgres_pool.clone()); - + // Create database catalog provider // This allows us to access tables through catalog structure (catalog.schema.table) - let catalog = DatabaseCatalogProvider::try_new(postgres_pool).await.unwrap(); + let catalog = DatabaseCatalogProvider::try_new(postgres_pool) + .await + .unwrap(); // Create DataFusion session context let ctx = SessionContext::new(); diff --git a/examples/sqlite.rs b/examples/sqlite.rs index 4c52db8b..83d9071b 100644 --- a/examples/sqlite.rs +++ b/examples/sqlite.rs @@ -32,7 +32,7 @@ async fn main() { // Create SQLite table provider factory // Used to generate TableProvider instances that can read SQLite table data let table_factory = SqliteTableFactory::new(sqlite_pool.clone()); - + // Create database catalog provider // This allows us to access tables through catalog structure (catalog.schema.table) let catalog_provider = DatabaseCatalogProvider::try_new(sqlite_pool).await.unwrap(); diff --git a/src/duckdb.rs b/src/duckdb.rs index eea69dc0..52cb9139 100644 --- a/src/duckdb.rs +++ b/src/duckdb.rs @@ -17,8 +17,8 @@ use crate::util::{ indexes::IndexType, on_conflict::{self, OnConflict}, }; -use arrow::{array::RecordBatch, datatypes::SchemaRef}; use async_trait::async_trait; +use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef}; use datafusion::{ catalog::{Session, TableProviderFactory}, common::Constraints, diff --git a/src/duckdb/creator.rs b/src/duckdb/creator.rs index 58c133c6..387f2869 100644 --- a/src/duckdb/creator.rs +++ b/src/duckdb/creator.rs @@ -1,6 +1,6 @@ use crate::sql::arrow_sql_gen::statement::IndexBuilder; use crate::sql::db_connection_pool::duckdbpool::DuckDbConnectionPool; -use arrow::{array::RecordBatch, datatypes::SchemaRef}; +use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef}; use datafusion::common::Constraints; use duckdb::{vtab::arrow_recordbatch_to_query_params, ToSql, Transaction}; use snafu::prelude::*; @@ -322,7 +322,7 @@ pub(crate) mod tests { use crate::sql::db_connection_pool::{ dbconnection::duckdbconn::DuckDbConnection, duckdbpool::DuckDbConnectionPool, }; - use arrow::array::RecordBatch; + use datafusion::arrow::array::RecordBatch; use datafusion::{ execution::{SendableRecordBatchStream, TaskContext}, parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder, @@ -358,7 +358,7 @@ pub(crate) mod tests { .expect("to build parquet reader"); parquet_reader - .collect::, arrow::error::ArrowError>>() + .collect::, datafusion::arrow::error::ArrowError>>() .expect("to get records") } diff --git a/src/duckdb/federation.rs b/src/duckdb/federation.rs index edbc1fd6..ce86092a 100644 --- a/src/duckdb/federation.rs +++ b/src/duckdb/federation.rs @@ -1,6 +1,6 @@ use crate::sql::db_connection_pool::dbconnection::{get_schema, Error as DbError}; use crate::sql::sql_provider_datafusion::{get_stream, to_execution_error}; -use arrow::datatypes::SchemaRef; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::sql::unparser::dialect::Dialect; use datafusion_federation::sql::{SQLExecutor, SQLFederationProvider, SQLTableSource}; use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource}; diff --git a/src/duckdb/write.rs b/src/duckdb/write.rs index b46758cb..ae90f598 100644 --- a/src/duckdb/write.rs +++ b/src/duckdb/write.rs @@ -6,8 +6,8 @@ use crate::util::{ on_conflict::OnConflict, retriable_error::{check_and_mark_retriable_error, to_retriable_data_write_error}, }; -use arrow::{array::RecordBatch, datatypes::SchemaRef}; use async_trait::async_trait; +use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef}; use datafusion::catalog::Session; use datafusion::common::Constraints; use datafusion::logical_expr::dml::InsertOp; diff --git a/src/flight.rs b/src/flight.rs index 9564612c..f0b240c2 100644 --- a/src/flight.rs +++ b/src/flight.rs @@ -27,14 +27,14 @@ use std::sync::Arc; use crate::flight::exec::FlightExec; use arrow_flight::error::FlightError; use arrow_flight::FlightInfo; -use arrow_schema::SchemaRef; use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::catalog::{Session, TableProviderFactory}; use datafusion::common::stats::Precision; use datafusion::common::{DataFusionError, Statistics}; use datafusion::datasource::TableProvider; +use datafusion::logical_expr::{CreateExternalTable, Expr, TableType}; use datafusion::physical_plan::ExecutionPlan; -use datafusion_expr::{CreateExternalTable, Expr, TableType}; use serde::{Deserialize, Serialize}; use tonic::transport::{Channel, ClientTlsConfig}; diff --git a/src/flight/codec.rs b/src/flight/codec.rs index 47ac0960..11b43d91 100644 --- a/src/flight/codec.rs +++ b/src/flight/codec.rs @@ -22,8 +22,8 @@ use std::sync::Arc; use crate::flight::exec::{FlightConfig, FlightExec}; use crate::flight::to_df_err; use datafusion::common::DataFusionError; -use datafusion_expr::registry::FunctionRegistry; -use datafusion_physical_plan::ExecutionPlan; +use datafusion::logical_expr::registry::FunctionRegistry; +use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::physical_plan::PhysicalExtensionCodec; /// Physical extension codec for FlightExec diff --git a/src/flight/exec.rs b/src/flight/exec.rs index 11fb7900..391a6ca9 100644 --- a/src/flight/exec.rs +++ b/src/flight/exec.rs @@ -24,19 +24,20 @@ use std::str::FromStr; use std::sync::Arc; use crate::flight::{flight_channel, to_df_err, FlightMetadata, FlightProperties, SizeLimits}; -use arrow_array::{new_null_array, ArrayRef, RecordBatch}; -use arrow_cast::cast; use arrow_flight::error::FlightError; use arrow_flight::flight_service_client::FlightServiceClient; use arrow_flight::{FlightClient, FlightEndpoint, Ticket}; -use arrow_schema::{ArrowError, Field, SchemaRef}; +use datafusion::arrow::array::{new_null_array, ArrayRef, RecordBatch}; +use datafusion::arrow::compute::cast; use datafusion::arrow::datatypes::ToByteSlice; +use datafusion::arrow::datatypes::{Field, SchemaRef}; +use datafusion::arrow::error::ArrowError; use datafusion::common::Result; use datafusion::common::{project_schema, DataFusionError}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; -use datafusion_physical_plan::stream::RecordBatchStreamAdapter; -use datafusion_physical_plan::{ +use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, }; use futures::{StreamExt, TryStreamExt}; @@ -222,7 +223,7 @@ async fn try_fetch_stream( pub fn enforce_schema( batch: RecordBatch, target_schema: &SchemaRef, -) -> arrow::error::Result { +) -> datafusion::arrow::error::Result { if target_schema.fields.is_empty() || batch.schema() == *target_schema { Ok(batch) } else if target_schema.contains(batch.schema_ref()) { @@ -232,7 +233,7 @@ pub fn enforce_schema( .fields .iter() .map(|field| find_matching_column(&batch, field.as_ref())) - .collect::>()?; + .collect::>()?; RecordBatch::try_new(target_schema.to_owned(), columns) } } @@ -240,7 +241,10 @@ pub fn enforce_schema( /// For a target schema field, extract the column with the same name if present in the /// record batch and cast it to the desired data type if needed. If the column is missing /// but the target schema field is nullable, generates a null-array column. -fn find_matching_column(batch: &RecordBatch, field: &Field) -> arrow::error::Result { +fn find_matching_column( + batch: &RecordBatch, + field: &Field, +) -> datafusion::arrow::error::Result { if let Some(column) = batch.column_by_name(field.name()) { if column.data_type() == field.data_type() { Ok(column.to_owned()) @@ -322,10 +326,10 @@ impl ExecutionPlan for FlightExec { mod tests { use crate::flight::exec::{enforce_schema, FlightConfig, FlightPartition, FlightTicket}; use crate::flight::{FlightProperties, SizeLimits}; - use arrow_array::{ + use datafusion::arrow::array::{ BooleanArray, Float32Array, Int32Array, RecordBatch, StringArray, StructArray, }; - use arrow_schema::{DataType, Field, Fields, Schema}; + use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema}; use std::collections::HashMap; use std::sync::Arc; diff --git a/src/lib.rs b/src/lib.rs index 1f068068..1bf20ffc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ +pub mod common; pub mod sql; pub mod util; -pub mod common; #[cfg(feature = "duckdb")] pub mod duckdb; diff --git a/src/mysql.rs b/src/mysql.rs index b3dc6ab6..02d7b78b 100644 --- a/src/mysql.rs +++ b/src/mysql.rs @@ -30,9 +30,9 @@ use crate::util::indexes::IndexType; use crate::util::on_conflict::OnConflict; use crate::util::secrets::to_secret_map; use crate::util::{column_reference, constraints, on_conflict, to_datafusion_error}; -use arrow::array::RecordBatch; -use arrow::datatypes::{Schema, SchemaRef}; use async_trait::async_trait; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; use datafusion::catalog::Session; use datafusion::{ catalog::TableProviderFactory, common::Constraints, datasource::TableProvider, diff --git a/src/mysql/write.rs b/src/mysql/write.rs index 1db4f365..6be1a51a 100644 --- a/src/mysql/write.rs +++ b/src/mysql/write.rs @@ -2,8 +2,8 @@ use crate::mysql::MySQL; use crate::util::on_conflict::OnConflict; use crate::util::retriable_error::check_and_mark_retriable_error; use crate::util::{constraints, to_datafusion_error}; -use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::{ catalog::Session, datasource::{TableProvider, TableType}, diff --git a/src/odbc.rs b/src/odbc.rs index 2891ec9d..29cf81e4 100644 --- a/src/odbc.rs +++ b/src/odbc.rs @@ -19,7 +19,7 @@ use crate::sql::{ db_connection_pool as db_connection_pool_datafusion, sql_provider_datafusion::{Engine, SqlTable}, }; -use arrow::datatypes::SchemaRef; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::error::DataFusionError; use datafusion::{datasource::TableProvider, sql::TableReference}; use snafu::prelude::*; diff --git a/src/postgres.rs b/src/postgres.rs index ff96f275..b7d83b1c 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -10,15 +10,15 @@ use crate::sql::db_connection_pool::{ use crate::sql::sql_provider_datafusion::{Engine, SqlTable}; -use arrow::{ - array::RecordBatch, - datatypes::{Schema, SchemaRef}, -}; use async_trait::async_trait; use bb8_postgres::{ tokio_postgres::{types::ToSql, Transaction}, PostgresConnectionManager, }; +use datafusion::arrow::{ + array::RecordBatch, + datatypes::{Schema, SchemaRef}, +}; use datafusion::catalog::Session; use datafusion::{ catalog::TableProviderFactory, diff --git a/src/postgres/write.rs b/src/postgres/write.rs index 3d6eefcb..a032024c 100644 --- a/src/postgres/write.rs +++ b/src/postgres/write.rs @@ -1,7 +1,7 @@ use std::{any::Any, fmt, sync::Arc}; -use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::{ catalog::Session, common::Constraints, diff --git a/src/sql/arrow_sql_gen/arrow.rs b/src/sql/arrow_sql_gen/arrow.rs index f60da64e..7149428b 100644 --- a/src/sql/arrow_sql_gen/arrow.rs +++ b/src/sql/arrow_sql_gen/arrow.rs @@ -1,4 +1,4 @@ -use arrow::{ +use datafusion::arrow::{ array::{ types::Int8Type, ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Date64Builder, Decimal128Builder, Decimal256Builder, FixedSizeBinaryBuilder, FixedSizeListBuilder, diff --git a/src/sql/arrow_sql_gen/mod.rs b/src/sql/arrow_sql_gen/mod.rs index a315abf2..606b831a 100644 --- a/src/sql/arrow_sql_gen/mod.rs +++ b/src/sql/arrow_sql_gen/mod.rs @@ -9,7 +9,7 @@ //! ### `CREATE TABLE` statement //! ```rust //! use std::sync::Arc; -//! use arrow::datatypes::{DataType, Field, Schema}; +//! use datafusion::arrow::datatypes::{DataType, Field, Schema}; //! use datafusion_table_providers::sql::arrow_sql_gen::statement::CreateTableBuilder; //! //! let schema = Arc::new(Schema::new(vec![ @@ -26,7 +26,7 @@ //! With primary key constraints: //! ```rust //! use std::sync::Arc; -//! use arrow::datatypes::{DataType, Field, Schema}; +//! use datafusion::arrow::datatypes::{DataType, Field, Schema}; //! use datafusion_table_providers::sql::arrow_sql_gen::statement::CreateTableBuilder; //! //! let schema = Arc::new(Schema::new(vec![ diff --git a/src/sql/arrow_sql_gen/mysql.rs b/src/sql/arrow_sql_gen/mysql.rs index b3460150..c2b56aea 100644 --- a/src/sql/arrow_sql_gen/mysql.rs +++ b/src/sql/arrow_sql_gen/mysql.rs @@ -1,5 +1,8 @@ use crate::sql::arrow_sql_gen::arrow::map_data_type_to_array_builder_optional; -use arrow::{ +use bigdecimal::BigDecimal; +use bigdecimal::ToPrimitive; +use chrono::{NaiveDate, NaiveTime, Timelike}; +use datafusion::arrow::{ array::{ ArrayBuilder, ArrayRef, BinaryBuilder, Date32Builder, Decimal128Builder, Decimal256Builder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, @@ -9,9 +12,6 @@ use arrow::{ }, datatypes::{i256, DataType, Date32Type, Field, Schema, SchemaRef, TimeUnit, UInt16Type}, }; -use bigdecimal::BigDecimal; -use bigdecimal::ToPrimitive; -use chrono::{NaiveDate, NaiveTime, Timelike}; use mysql_async::{consts::ColumnFlags, consts::ColumnType, FromValueError, Row, Value}; use snafu::{ResultExt, Snafu}; use std::{convert, sync::Arc}; @@ -20,7 +20,9 @@ use time::PrimitiveDateTime; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to build record batch: {source}"))] - FailedToBuildRecordBatch { source: arrow::error::ArrowError }, + FailedToBuildRecordBatch { + source: datafusion::arrow::error::ArrowError, + }, #[snafu(display("No builder found for index {index}"))] NoBuilderForIndex { index: usize }, diff --git a/src/sql/arrow_sql_gen/postgres.rs b/src/sql/arrow_sql_gen/postgres.rs index 7eae46ee..4434154e 100644 --- a/src/sql/arrow_sql_gen/postgres.rs +++ b/src/sql/arrow_sql_gen/postgres.rs @@ -4,24 +4,24 @@ use std::sync::Arc; use crate::sql::arrow_sql_gen::arrow::map_data_type_to_array_builder_optional; use crate::sql::arrow_sql_gen::statement::map_data_type_to_column_type; -use arrow::array::{ +use bigdecimal::num_bigint::BigInt; +use bigdecimal::num_bigint::Sign; +use bigdecimal::BigDecimal; +use bigdecimal::ToPrimitive; +use byteorder::{BigEndian, ReadBytesExt}; +use chrono::{DateTime, Offset, Timelike, Utc}; +use composite::CompositeType; +use datafusion::arrow::array::{ ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, FixedSizeListBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, IntervalMonthDayNanoBuilder, LargeBinaryBuilder, LargeStringBuilder, ListBuilder, RecordBatch, RecordBatchOptions, StringBuilder, StringDictionaryBuilder, StructBuilder, Time64NanosecondBuilder, TimestampNanosecondBuilder, UInt32Builder, }; -use arrow::datatypes::{ +use datafusion::arrow::datatypes::{ DataType, Date32Type, Field, Int8Type, IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, TimeUnit, }; -use bigdecimal::num_bigint::BigInt; -use bigdecimal::num_bigint::Sign; -use bigdecimal::BigDecimal; -use bigdecimal::ToPrimitive; -use byteorder::{BigEndian, ReadBytesExt}; -use chrono::{DateTime, Offset, Timelike, Utc}; -use composite::CompositeType; use geo_types::geometry::Point; use sea_query::{Alias, ColumnType, SeaRc}; use serde_json::Value; @@ -38,7 +38,9 @@ pub mod schema; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to build record batch: {source}"))] - FailedToBuildRecordBatch { source: arrow::error::ArrowError }, + FailedToBuildRecordBatch { + source: datafusion::arrow::error::ArrowError, + }, #[snafu(display("No builder found for index {index}"))] NoBuilderForIndex { index: usize }, @@ -1131,8 +1133,8 @@ fn get_decimal_column_precision_and_scale( #[cfg(test)] mod tests { use super::*; - use arrow::array::{Time64NanosecondArray, Time64NanosecondBuilder}; use chrono::NaiveTime; + use datafusion::arrow::array::{Time64NanosecondArray, Time64NanosecondBuilder}; use geo_types::{point, polygon, Geometry}; use geozero::{CoordDimensions, ToWkb}; use std::str::FromStr; diff --git a/src/sql/arrow_sql_gen/postgres/builder.rs b/src/sql/arrow_sql_gen/postgres/builder.rs index 2ff35495..6b9e6e84 100644 --- a/src/sql/arrow_sql_gen/postgres/builder.rs +++ b/src/sql/arrow_sql_gen/postgres/builder.rs @@ -1,4 +1,4 @@ -use arrow::datatypes::Fields; +use datafusion::arrow::datatypes::Fields; use sea_query::{Alias, ColumnDef, PostgresQueryBuilder, TableBuilder}; use crate::sql::arrow_sql_gen::statement::map_data_type_to_column_type; @@ -80,7 +80,7 @@ fn fields_to_simple_column_defs(fields: &Fields) -> Vec { #[cfg(test)] mod tests { - use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; use super::*; diff --git a/src/sql/arrow_sql_gen/postgres/schema.rs b/src/sql/arrow_sql_gen/postgres/schema.rs index fdfa384a..bf8e3af1 100644 --- a/src/sql/arrow_sql_gen/postgres/schema.rs +++ b/src/sql/arrow_sql_gen/postgres/schema.rs @@ -1,5 +1,5 @@ -use arrow::datatypes::{DataType, Field, Fields, IntervalUnit, TimeUnit}; -use arrow::error::ArrowError; +use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit, TimeUnit}; +use datafusion::arrow::error::ArrowError; use serde_json::json; use serde_json::Value; use std::sync::Arc; diff --git a/src/sql/arrow_sql_gen/sqlite.rs b/src/sql/arrow_sql_gen/sqlite.rs index 64663058..915f04e2 100644 --- a/src/sql/arrow_sql_gen/sqlite.rs +++ b/src/sql/arrow_sql_gen/sqlite.rs @@ -17,19 +17,19 @@ limitations under the License. use std::sync::Arc; use crate::sql::arrow_sql_gen::arrow::map_data_type_to_array_builder; -use arrow::array::ArrayBuilder; -use arrow::array::ArrayRef; -use arrow::array::BinaryBuilder; -use arrow::array::Float64Builder; -use arrow::array::Int64Builder; -use arrow::array::NullBuilder; -use arrow::array::RecordBatch; -use arrow::array::RecordBatchOptions; -use arrow::array::StringBuilder; -use arrow::datatypes::DataType; -use arrow::datatypes::Field; -use arrow::datatypes::Schema; -use arrow::datatypes::SchemaRef; +use datafusion::arrow::array::ArrayBuilder; +use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::array::BinaryBuilder; +use datafusion::arrow::array::Float64Builder; +use datafusion::arrow::array::Int64Builder; +use datafusion::arrow::array::NullBuilder; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::array::RecordBatchOptions; +use datafusion::arrow::array::StringBuilder; +use datafusion::arrow::datatypes::DataType; +use datafusion::arrow::datatypes::Field; +use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::datatypes::SchemaRef; use rusqlite::types::Type; use rusqlite::Row; use rusqlite::Rows; @@ -38,7 +38,9 @@ use snafu::prelude::*; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to build record batch: {source}"))] - FailedToBuildRecordBatch { source: arrow::error::ArrowError }, + FailedToBuildRecordBatch { + source: datafusion::arrow::error::ArrowError, + }, #[snafu(display("No builder found for index {index}"))] NoBuilderForIndex { index: usize }, diff --git a/src/sql/arrow_sql_gen/statement.rs b/src/sql/arrow_sql_gen/statement.rs index a6edf76a..0dadfa53 100644 --- a/src/sql/arrow_sql_gen/statement.rs +++ b/src/sql/arrow_sql_gen/statement.rs @@ -1,4 +1,6 @@ -use arrow::{ +use bigdecimal::BigDecimal; +use chrono::{DateTime, FixedOffset}; +use datafusion::arrow::{ array::{ array, Array, ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, RecordBatch, StringArray, StructArray, @@ -7,8 +9,6 @@ use arrow::{ datatypes::{DataType, Field, Fields, IntervalUnit, Schema, SchemaRef, TimeUnit}, util::display::array_value_to_string, }; -use bigdecimal::BigDecimal; -use chrono::{DateTime, FixedOffset}; use num_bigint::BigInt; use sea_query::{ Alias, ColumnDef, ColumnType, Expr, GenericBuilder, Index, InsertStatement, IntoIden, @@ -1358,7 +1358,7 @@ fn insert_struct_into_row_values_json( source: Box::new(e), })?; - let mut writer = arrow_json::LineDelimitedWriter::new(Vec::new()); + let mut writer = datafusion::arrow::json::LineDelimitedWriter::new(Vec::new()); writer .write(&batch) .map_err(|e| Error::FailedToCreateInsertStatement { @@ -1386,7 +1386,7 @@ mod tests { use std::sync::Arc; use super::*; - use arrow::datatypes::{DataType, Field, Int32Type, Schema}; + use datafusion::arrow::datatypes::{DataType, Field, Int32Type, Schema}; #[test] fn test_basic_table_creation() { diff --git a/src/sql/db_connection_pool/dbconnection.rs b/src/sql/db_connection_pool/dbconnection.rs index e97cf2c4..58a07e46 100644 --- a/src/sql/db_connection_pool/dbconnection.rs +++ b/src/sql/db_connection_pool/dbconnection.rs @@ -193,7 +193,7 @@ pub async fn get_schemas(conn: Box>) -> Result( conn: Box>, table_reference: &datafusion::sql::TableReference, -) -> Result, Error> { +) -> Result, Error> { let schema = if let Some(conn) = conn.as_sync() { conn.get_schema(table_reference)? } else if let Some(conn) = conn.as_async() { diff --git a/src/sql/db_connection_pool/dbconnection/duckdbconn.rs b/src/sql/db_connection_pool/dbconnection/duckdbconn.rs index 511f2263..bf53fcc3 100644 --- a/src/sql/db_connection_pool/dbconnection/duckdbconn.rs +++ b/src/sql/db_connection_pool/dbconnection/duckdbconn.rs @@ -1,8 +1,8 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::RecordBatch; use async_stream::stream; +use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef; use datafusion::error::DataFusionError; use datafusion::execution::SendableRecordBatchStream; diff --git a/src/sql/db_connection_pool/dbconnection/mysqlconn.rs b/src/sql/db_connection_pool/dbconnection/mysqlconn.rs index 2bc5b2e2..65828cde 100644 --- a/src/sql/db_connection_pool/dbconnection/mysqlconn.rs +++ b/src/sql/db_connection_pool/dbconnection/mysqlconn.rs @@ -2,8 +2,8 @@ use std::{any::Any, sync::Arc}; use crate::sql::arrow_sql_gen::mysql::map_column_to_data_type; use crate::sql::arrow_sql_gen::{self, mysql::rows_to_arrow}; -use arrow::datatypes::{Field, Schema, SchemaRef}; use async_stream::stream; +use datafusion::arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion::error::DataFusionError; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; diff --git a/src/sql/db_connection_pool/dbconnection/odbcconn.rs b/src/sql/db_connection_pool/dbconnection/odbcconn.rs index 9b178e01..a9ec6e1a 100644 --- a/src/sql/db_connection_pool/dbconnection/odbcconn.rs +++ b/src/sql/db_connection_pool/dbconnection/odbcconn.rs @@ -22,14 +22,14 @@ use crate::sql::db_connection_pool::{ dbconnection::{self, AsyncDbConnection, DbConnection, GenericError}, DbConnectionPool, }; -use arrow::datatypes::Schema; -use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; use arrow_odbc::arrow_schema_from; use arrow_odbc::OdbcReader; use arrow_odbc::OdbcReaderBuilder; use async_stream::stream; use async_trait::async_trait; +use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::DataFusionError; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -72,7 +72,9 @@ pub type ODBCDbConnectionPool<'a> = #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Failed to convert query result to Arrow: {source}"))] - ArrowError { source: arrow::error::ArrowError }, + ArrowError { + source: datafusion::arrow::error::ArrowError, + }, #[snafu(display("arrow_odbc error: {source}"))] ArrowODBCError { source: arrow_odbc::Error }, #[snafu(display("odbc_api Error: {source}"))] diff --git a/src/sql/db_connection_pool/dbconnection/postgresconn.rs b/src/sql/db_connection_pool/dbconnection/postgresconn.rs index d5090fca..fdc3c3f9 100644 --- a/src/sql/db_connection_pool/dbconnection/postgresconn.rs +++ b/src/sql/db_connection_pool/dbconnection/postgresconn.rs @@ -4,12 +4,12 @@ use std::sync::Arc; use crate::sql::arrow_sql_gen::postgres::rows_to_arrow; use crate::sql::arrow_sql_gen::postgres::schema::pg_data_type_to_arrow_type; -use arrow::datatypes::Field; -use arrow::datatypes::Schema; -use arrow::datatypes::SchemaRef; use async_stream::stream; use bb8_postgres::tokio_postgres::types::ToSql; use bb8_postgres::PostgresConnectionManager; +use datafusion::arrow::datatypes::Field; +use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::error::DataFusionError; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; diff --git a/src/sql/db_connection_pool/dbconnection/sqliteconn.rs b/src/sql/db_connection_pool/dbconnection/sqliteconn.rs index 1b71d132..c00d90e6 100644 --- a/src/sql/db_connection_pool/dbconnection/sqliteconn.rs +++ b/src/sql/db_connection_pool/dbconnection/sqliteconn.rs @@ -1,8 +1,8 @@ use std::any::Any; use crate::sql::arrow_sql_gen::sqlite::rows_to_arrow; -use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::memory::MemoryStream; use datafusion::sql::TableReference; diff --git a/src/sql/db_connection_pool/odbcpool.rs b/src/sql/db_connection_pool/odbcpool.rs index 75d2d011..cbdf133e 100644 --- a/src/sql/db_connection_pool/odbcpool.rs +++ b/src/sql/db_connection_pool/odbcpool.rs @@ -16,8 +16,8 @@ limitations under the License. use crate::sql::db_connection_pool::dbconnection::odbcconn::ODBCConnection; use crate::sql::db_connection_pool::dbconnection::odbcconn::{ODBCDbConnection, ODBCParameter}; -use async_trait::async_trait; use crate::sql::db_connection_pool::{DbConnectionPool, JoinPushDown}; +use async_trait::async_trait; use odbc_api::{sys::AttrConnectionPooling, Connection, ConnectionOptions, Environment}; use secrecy::{ExposeSecret, Secret, SecretString}; use sha2::{Digest, Sha256}; diff --git a/src/sql/sql_provider_datafusion/mod.rs b/src/sql/sql_provider_datafusion/mod.rs index e7c217b2..376a23bd 100644 --- a/src/sql/sql_provider_datafusion/mod.rs +++ b/src/sql/sql_provider_datafusion/mod.rs @@ -410,8 +410,8 @@ mod tests { mod sql_table_plan_to_sql_tests { use std::any::Any; - use arrow_schema::{DataType, Field, Schema, TimeUnit}; use async_trait::async_trait; + use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::{ logical_expr::{col, lit}, sql::TableReference, diff --git a/src/sqlite.rs b/src/sqlite.rs index 8b8831ec..e5d909d9 100644 --- a/src/sqlite.rs +++ b/src/sqlite.rs @@ -9,9 +9,9 @@ use crate::sql::db_connection_pool::{ DbConnectionPool, Mode, }; use crate::sql::sql_provider_datafusion; -use arrow::array::{Int64Array, StringArray}; -use arrow::{array::RecordBatch, datatypes::SchemaRef}; use async_trait::async_trait; +use datafusion::arrow::array::{Int64Array, StringArray}; +use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef}; use datafusion::catalog::Session; use datafusion::{ catalog::TableProviderFactory, @@ -663,7 +663,7 @@ impl Sqlite { #[cfg(test)] pub(crate) mod tests { - use arrow::datatypes::{DataType, Schema}; + use datafusion::arrow::datatypes::{DataType, Schema}; use datafusion::{ common::{Constraint, ToDFSchema}, prelude::SessionContext, @@ -674,9 +674,9 @@ pub(crate) mod tests { #[tokio::test] async fn test_sqlite_table_creation_with_indexes() { let schema = Arc::new(Schema::new(vec![ - arrow::datatypes::Field::new("first_name", DataType::Utf8, false), - arrow::datatypes::Field::new("last_name", DataType::Utf8, false), - arrow::datatypes::Field::new("id", DataType::Int64, false), + datafusion::arrow::datatypes::Field::new("first_name", DataType::Utf8, false), + datafusion::arrow::datatypes::Field::new("last_name", DataType::Utf8, false), + datafusion::arrow::datatypes::Field::new("id", DataType::Int64, false), ])); let options: HashMap = [( diff --git a/src/sqlite/federation.rs b/src/sqlite/federation.rs index 4fb95dff..1897a68b 100644 --- a/src/sqlite/federation.rs +++ b/src/sqlite/federation.rs @@ -1,7 +1,7 @@ use crate::sql::db_connection_pool::dbconnection::{get_schema, Error as DbError}; use crate::sql::sql_provider_datafusion::{get_stream, to_execution_error}; -use arrow::datatypes::SchemaRef; use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::sql::sqlparser::ast::{self, VisitMut}; use datafusion::sql::unparser::dialect::Dialect; use datafusion_federation::sql::{AstAnalyzer, SQLExecutor, SQLFederationProvider, SQLTableSource}; diff --git a/src/sqlite/write.rs b/src/sqlite/write.rs index 4ab8a6e4..4d02e949 100644 --- a/src/sqlite/write.rs +++ b/src/sqlite/write.rs @@ -1,7 +1,7 @@ use std::{any::Any, fmt, sync::Arc}; -use arrow::{array::RecordBatch, datatypes::SchemaRef}; use async_trait::async_trait; +use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef}; use datafusion::{ catalog::Session, common::Constraints, @@ -232,7 +232,7 @@ impl DisplayAs for SqliteDataSink { mod tests { use std::{collections::HashMap, sync::Arc}; - use arrow::{ + use datafusion::arrow::{ array::{Int64Array, RecordBatch, StringArray}, datatypes::{DataType, Schema}, }; @@ -251,8 +251,8 @@ mod tests { #[allow(clippy::unreadable_literal)] async fn test_round_trip_sqlite() { let schema = Arc::new(Schema::new(vec![ - arrow::datatypes::Field::new("time_in_string", DataType::Utf8, false), - arrow::datatypes::Field::new("time_int", DataType::Int64, false), + datafusion::arrow::datatypes::Field::new("time_in_string", DataType::Utf8, false), + datafusion::arrow::datatypes::Field::new("time_int", DataType::Int64, false), ])); let df_schema = ToDFSchema::to_dfschema_ref(Arc::clone(&schema)).expect("df schema"); let external_table = CreateExternalTable { diff --git a/src/util/constraints.rs b/src/util/constraints.rs index 7ad772a6..38ae2ec6 100644 --- a/src/util/constraints.rs +++ b/src/util/constraints.rs @@ -1,4 +1,4 @@ -use arrow::{array::RecordBatch, datatypes::SchemaRef}; +use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef}; use datafusion::{ common::{Constraint, Constraints}, execution::context::SessionContext, @@ -114,7 +114,7 @@ pub fn get_primary_keys_from_constraints( pub(crate) mod tests { use std::sync::Arc; - use arrow::datatypes::SchemaRef; + use datafusion::arrow::datatypes::SchemaRef; use datafusion::{ common::{Constraint, Constraints}, parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder, @@ -129,7 +129,8 @@ pub(crate) mod tests { let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(parquet_bytes)?.build()?; - let records = parquet_reader.collect::, arrow::error::ArrowError>>()?; + let records = + parquet_reader.collect::, datafusion::arrow::error::ArrowError>>()?; let schema = records[0].schema(); let constraints = diff --git a/src/util/on_conflict.rs b/src/util/on_conflict.rs index deb1563f..42d8b5e3 100644 --- a/src/util/on_conflict.rs +++ b/src/util/on_conflict.rs @@ -1,4 +1,4 @@ -use arrow::datatypes::SchemaRef; +use datafusion::arrow::datatypes::SchemaRef; use itertools::Itertools; use sea_query::{self, Alias}; use snafu::prelude::*; @@ -138,7 +138,7 @@ impl TryFrom<&str> for OnConflict { mod tests { use std::sync::Arc; - use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; use crate::util::{column_reference::ColumnReference, on_conflict::OnConflict}; diff --git a/src/util/test.rs b/src/util/test.rs index 95aa4123..b6080e09 100644 --- a/src/util/test.rs +++ b/src/util/test.rs @@ -1,6 +1,6 @@ use std::{any::Any, sync::Arc}; -use arrow::{array::RecordBatch, datatypes::SchemaRef}; +use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef}; use datafusion::{ common::Statistics, error::{DataFusionError, Result}, diff --git a/tests/arrow_record_batch_gen/mod.rs b/tests/arrow_record_batch_gen/mod.rs index f93141b1..00e97d56 100644 --- a/tests/arrow_record_batch_gen/mod.rs +++ b/tests/arrow_record_batch_gen/mod.rs @@ -1,12 +1,12 @@ -use arrow::array::RecordBatch; -use arrow::{ +use chrono::NaiveDate; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::{ array::*, datatypes::{ i256, DataType, Date32Type, Date64Type, Field, Int8Type, IntervalDayTime, IntervalMonthDayNano, IntervalUnit, Schema, SchemaRef, TimeUnit, }, }; -use chrono::NaiveDate; use std::sync::Arc; // Helper functions to create arrow record batches of different types diff --git a/tests/duckdb/mod.rs b/tests/duckdb/mod.rs index 2ab394ca..a99660eb 100644 --- a/tests/duckdb/mod.rs +++ b/tests/duckdb/mod.rs @@ -1,6 +1,6 @@ use crate::arrow_record_batch_gen::*; -use arrow::array::RecordBatch; -use arrow::datatypes::SchemaRef; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::catalog::TableProviderFactory; use datafusion::common::{Constraints, ToDFSchema}; use datafusion::execution::context::SessionContext; diff --git a/tests/flight/mod.rs b/tests/flight/mod.rs index 35c2fd46..924bbd47 100644 --- a/tests/flight/mod.rs +++ b/tests/flight/mod.rs @@ -4,7 +4,6 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use arrow_array::{Array, Float32Array, Int64Array, Int8Array, RecordBatch}; use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use arrow_flight::sql::server::FlightSqlService; @@ -12,8 +11,9 @@ use arrow_flight::sql::{CommandStatementQuery, ProstMessageExt, SqlInfo, TicketS use arrow_flight::{ FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, Ticket, }; -use arrow_schema::{DataType, Field, Schema}; use async_trait::async_trait; +use datafusion::arrow::array::{Array, Float32Array, Int64Array, Int8Array, RecordBatch}; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::prelude::SessionContext; use futures::{stream, Stream, TryStreamExt}; use prost::Message; diff --git a/tests/mysql/mod.rs b/tests/mysql/mod.rs index b10042fa..956181a3 100644 --- a/tests/mysql/mod.rs +++ b/tests/mysql/mod.rs @@ -7,17 +7,17 @@ use rstest::{fixture, rstest}; use std::sync::Arc; use crate::docker::RunningContainer; -use arrow::{ +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::{ array::*, datatypes::{i256, DataType, Field, Schema, TimeUnit, UInt16Type}, }; -use arrow_schema::SchemaRef; use datafusion::catalog::TableProviderFactory; use datafusion::common::{Constraints, ToDFSchema}; +use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::CreateExternalTable; use datafusion::physical_plan::collect; use datafusion::physical_plan::memory::MemoryExec; -use datafusion_expr::dml::InsertOp; #[cfg(feature = "mysql-federation")] use datafusion_federation::schema_cast::record_convert::try_cast_to; use datafusion_table_providers::mysql::MySQLTableProviderFactory; diff --git a/tests/postgres/mod.rs b/tests/postgres/mod.rs index 7a966cfb..97efa97c 100644 --- a/tests/postgres/mod.rs +++ b/tests/postgres/mod.rs @@ -1,14 +1,14 @@ use crate::arrow_record_batch_gen::*; -use arrow::{ +use datafusion::arrow::{ array::{Decimal128Array, RecordBatch}, datatypes::{DataType, Field, Schema, SchemaRef}, }; -use datafusion::{catalog::TableProviderFactory, logical_expr::dml::InsertOp}; use datafusion::common::{Constraints, ToDFSchema}; use datafusion::execution::context::SessionContext; use datafusion::logical_expr::CreateExternalTable; use datafusion::physical_plan::collect; use datafusion::physical_plan::memory::MemoryExec; +use datafusion::{catalog::TableProviderFactory, logical_expr::dml::InsertOp}; #[cfg(feature = "postgres-federation")] use datafusion_federation::schema_cast::record_convert::try_cast_to; @@ -56,11 +56,7 @@ async fn arrow_postgres_round_trip( let mem_exec = MemoryExec::try_new(&[vec![arrow_record.clone()]], arrow_record.schema(), None) .expect("memory exec created"); let insert_plan = table_provider - .insert_into( - &ctx.state(), - Arc::new(mem_exec), - InsertOp::Overwrite, - ) + .insert_into(&ctx.state(), Arc::new(mem_exec), InsertOp::Overwrite) .await .expect("insert plan created"); @@ -103,7 +99,7 @@ struct ContainerManager { impl Drop for ContainerManager { fn drop(&mut self) { - let _ = tokio::runtime::Runtime::new() + tokio::runtime::Runtime::new() .unwrap() .block_on(stop_container(self.port)); } diff --git a/tests/postgres/schema.rs b/tests/postgres/schema.rs index 7acc4dd2..f97c9871 100644 --- a/tests/postgres/schema.rs +++ b/tests/postgres/schema.rs @@ -1,4 +1,4 @@ -use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; use datafusion::catalog::TableProviderFactory; use datafusion::common::Constraints; use datafusion::common::ToDFSchema; diff --git a/tests/sqlite/mod.rs b/tests/sqlite/mod.rs index 5395773b..8341ac94 100644 --- a/tests/sqlite/mod.rs +++ b/tests/sqlite/mod.rs @@ -1,6 +1,6 @@ use crate::arrow_record_batch_gen::*; -use arrow::array::RecordBatch; -use arrow::datatypes::SchemaRef; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::execution::context::SessionContext; #[cfg(feature = "sqlite-federation")] use datafusion_federation::schema_cast::record_convert::try_cast_to;