Skip to content

Commit

Permalink
add ODBC usage to README & simplify dependency management (#189)
Browse files Browse the repository at this point in the history
* docs: add ODBC usage to README

* chore: simplify dependency management
  • Loading branch information
xhwhis authored Dec 4, 2024
1 parent 6e42290 commit 00a2af6
Show file tree
Hide file tree
Showing 49 changed files with 224 additions and 223 deletions.
138 changes: 61 additions & 77 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,50 @@ license = "Apache-2.0"
description = "Extend the capabilities of DataFusion to support additional data sources via implementations of the `TableProvider` trait."

[dependencies]
arrow = "53"
arrow-array = { version = "53", optional = true }
arrow-cast = { version = "53", optional = true }
arrow-flight = { version = "53", optional = true, features = [
"flight-sql-experimental",
"tls",
] }
arrow-schema = { version = "53", optional = true, features = ["serde"] }
arrow-json = "53"
async-stream = { version = "0.3.5", optional = true }
async-trait = "0.1.80"
num-bigint = "0.4.4"
bigdecimal = "0.4.5"
arrow-odbc = { version = "14.0", optional = true }
async-stream = { version = "0.3.6", optional = true }
async-trait = "0.1"
bb8 = { version = "0.8", optional = true }
bb8-postgres = { version = "0.8", optional = true }
bigdecimal = "0.4.6"
byteorder = "1.5.0"
chrono = "0.4.38"
datafusion = "43.0.0"
datafusion-expr = { version = "43.0.0", optional = true }
datafusion-physical-expr = { version = "43.0.0", optional = true }
datafusion-physical-plan = { version = "43.0.0", optional = true }
datafusion-proto = { version = "43.0.0", optional = true }
dashmap = "6.1.0"
datafusion = { version = "43", default-features = false }
datafusion-federation = { version = "0.3.1", features = [
"sql",
], optional = true }
datafusion-proto = { version = "43", optional = true }
duckdb = { version = "1.1.1", features = [
"bundled",
"r2d2",
"vtab",
"vtab-arrow",
"appender-arrow",
], optional = true }
dyn-clone = { version = "1.0", optional = true }
fallible-iterator = "0.3.0"
futures = "0.3.30"
mysql_async = { version = "0.34.1", features = [
fundu = "2.0.1"
futures = "0.3"
geo-types = "0.7"
itertools = "0.13.0"
mysql_async = { version = "0.34", features = [
"native-tls-tls",
"chrono",
], optional = true }
prost = { version = "0.13.2", optional = true }
native-tls = { version = "0.2.12", optional = true }
num-bigint = "0.4"
odbc-api = { version = "10.0.0", optional = true }
pem = { version = "3.0.4", optional = true }
postgres-native-tls = { version = "0.5.0", optional = true }
prost = { version = "0.13", optional = true }
r2d2 = { version = "0.8.10", optional = true }
rusqlite = { version = "0.31.0", optional = true }
sea-query = { version = "0.32.0-rc.1", features = [
rusqlite = { version = "0.32.1", optional = true }
sea-query = { version = "0.32.0", features = [
"backend-sqlite",
"backend-postgres",
"postgres-array",
Expand All @@ -57,92 +61,72 @@ sea-query = { version = "0.32.0-rc.1", features = [
"with-chrono",
] }
secrecy = "0.8.0"
serde = { version = "1.0.209", optional = true }
serde_json = "1.0.124"
snafu = "0.8.3"
serde = { version = "1.0", optional = true }
serde_json = "1.0"
sha2 = "0.10.8"
snafu = "0.8.5"
time = "0.3.36"
tokio = { version = "1.38.0", features = ["macros", "fs"] }
tokio-util = "0.7.12"
tokio-postgres = { version = "0.7.10", features = [
tokio = { version = "1.41", features = ["macros", "fs"] }
tokio-postgres = { version = "0.7.12", features = [
"with-chrono-0_4",
"with-uuid-1",
"with-serde_json-1",
"with-geo-types-0_7",
], optional = true }
tracing = "0.1.40"
uuid = { version = "1.9.1", optional = true }
postgres-native-tls = { version = "0.5.0", optional = true }
bb8 = { version = "0.8", optional = true }
bb8-postgres = { version = "0.8", optional = true }
native-tls = { version = "0.2.11", optional = true }
trust-dns-resolver = "0.23.2"
url = "2.5.1"
pem = { version = "3.0.4", optional = true }
tokio-rusqlite = { version = "0.5.1", optional = true }
tokio-rusqlite = { version = "0.6.0", optional = true }
tonic = { version = "0.12", optional = true, features = [
"tls-native-roots",
"tls-webpki-roots",
] }
itertools = "0.13.0"
dyn-clone = { version = "1.0.17", optional = true }
geo-types = "0.7.13"
fundu = "2.0.1"
dashmap = "6.1.0"
odbc-api = { version = "9.0.0", optional = true }
arrow-odbc = { version = "13.0.0", optional = true }
sha2 = "0.10.8"
tracing = "0.1.40"
trust-dns-resolver = "0.23.2"
url = "2.5.4"
uuid = { version = "1.11.0", optional = true }

[dev-dependencies]
anyhow = "1.0.86"
bollard = "0.17.1"
rand = "0.8.5"
reqwest = "0.12.5"
secrecy = "0.8.0"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
test-log = { version = "0.2.16", features = ["trace"] }
rstest = "0.23.0"
anyhow = "1.0"
bollard = "0.18.1"
geozero = { version = "0.14.0", features = ["with-wkb"] }
tokio-stream = { version = "0.1.15", features = ["net"] }
arrow-schema = "53.1.0"
insta = { version = "1.41.1", features = ["filters"] }
prost = { version = "0.13" }
insta = { version = "1.40.0", features = ["filters"] }
rand = "0.8.5"
reqwest = "0.12.9"
rstest = "0.23.0"
test-log = { version = "0.2.16", features = ["trace"] }
tokio-stream = { version = "0.1.16", features = ["net"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[features]
mysql = ["dep:mysql_async", "dep:async-stream"]
postgres = [
"dep:tokio-postgres",
"dep:uuid",
"dep:postgres-native-tls",
"dep:bb8",
"dep:bb8-postgres",
"dep:native-tls",
"dep:pem",
"dep:async-stream",
]
sqlite = ["dep:rusqlite", "dep:tokio-rusqlite"]
duckdb = [
"dep:duckdb",
"dep:r2d2",
"dep:uuid",
"dep:dyn-clone",
"dep:async-stream",
]
duckdb-federation = ["duckdb", "federation"]
federation = ["dep:datafusion-federation"]
flight = [
"dep:arrow-array",
"dep:arrow-cast",
"dep:arrow-flight",
"dep:arrow-schema",
"dep:datafusion-expr",
"dep:datafusion-physical-expr",
"dep:datafusion-physical-plan",
"datafusion/serde",
"dep:datafusion-proto",
"dep:serde",
"dep:tonic",
]
odbc = ["dep:odbc-api", "dep:arrow-odbc", "dep:async-stream", "dep:dyn-clone"]
federation = ["dep:datafusion-federation"]
duckdb-federation = ["duckdb", "federation"]
sqlite-federation = ["sqlite", "federation"]
postgres-federation = ["postgres", "federation"]
mysql = ["dep:mysql_async", "dep:async-stream"]
mysql-federation = ["mysql", "federation"]
odbc = ["dep:odbc-api", "dep:arrow-odbc", "dep:async-stream", "dep:dyn-clone"]
odbc-federation = ["odbc", "federation"]
postgres = [
"dep:tokio-postgres",
"dep:uuid",
"dep:postgres-native-tls",
"dep:bb8",
"dep:bb8-postgres",
"dep:native-tls",
"dep:pem",
"dep:async-stream",
]
postgres-federation = ["postgres", "federation"]
sqlite = ["dep:rusqlite", "dep:tokio-rusqlite"]
sqlite-federation = ["sqlite", "federation"]
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ let ctx = SessionContext::with_state(state);
- SQLite
- DuckDB
- Flight SQL
- ODBC

## Examples

Expand Down Expand Up @@ -100,8 +101,19 @@ cargo run --example mysql --features mysql
```bash
brew install roapi
# or
#cargo install --locked --git https://github.com/roapi/roapi --branch main --bins roapi
# cargo install --locked --git https://github.com/roapi/roapi --branch main --bins roapi
roapi -t taxi=https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet &

cargo run --example flight-sql --features flight
```

### ODBC

```bash
apt-get install unixodbc-dev libsqliteodbc
# or
# brew install unixodbc & brew install sqliteodbc
# If you use ARM Mac, please see https://github.com/pacman82/odbc-api#os-x-arm--mac-m1

cargo run --example odbc_sqlite --features odbc
```
9 changes: 4 additions & 5 deletions examples/duckdb.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::sync::Arc;

use datafusion::prelude::SessionContext;
use datafusion::sql::TableReference;
use datafusion_table_providers::{
common::DatabaseCatalogProvider,
common::DatabaseCatalogProvider, duckdb::DuckDBTableFactory,
sql::db_connection_pool::duckdbpool::DuckDbConnectionPool,
duckdb::DuckDBTableFactory,
};
use datafusion::sql::TableReference;
use duckdb::AccessMode;

/// This example demonstrates how to:
Expand All @@ -17,7 +16,7 @@ use duckdb::AccessMode;
#[tokio::main]
async fn main() {
// Create DuckDB connection pool
// Opening in ReadOnly mode allows multiple reader processes to access
// Opening in ReadOnly mode allows multiple reader processes to access
// the database at the same time
let duckdb_pool = Arc::new(
DuckDbConnectionPool::new_file("examples/duckdb_example.db", &AccessMode::ReadOnly)
Expand All @@ -27,7 +26,7 @@ async fn main() {
// Create DuckDB table provider factory
// Used to generate TableProvider instances that can read DuckDB table data
let table_factory = DuckDBTableFactory::new(duckdb_pool.clone());

// Create database catalog provider
// This allows us to access tables through catalog structure (catalog.schema.table)
let catalog = DatabaseCatalogProvider::try_new(duckdb_pool).await.unwrap();
Expand Down
8 changes: 3 additions & 5 deletions examples/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ use std::{collections::HashMap, sync::Arc};
use datafusion::prelude::SessionContext;
use datafusion::sql::TableReference;
use datafusion_table_providers::{
common::DatabaseCatalogProvider,
sql::db_connection_pool::mysqlpool::MySQLConnectionPool,
mysql::MySQLTableFactory,
util::secrets::to_secret_map,
common::DatabaseCatalogProvider, mysql::MySQLTableFactory,
sql::db_connection_pool::mysqlpool::MySQLConnectionPool, util::secrets::to_secret_map,
};

/// This example demonstrates how to:
Expand Down Expand Up @@ -54,7 +52,7 @@ async fn main() {
// Create MySQL table provider factory
// Used to generate TableProvider instances that can read MySQL table data
let table_factory = MySQLTableFactory::new(mysql_pool.clone());

// Create database catalog provider
// This allows us to access tables through catalog structure (catalog.schema.table)
let catalog = DatabaseCatalogProvider::try_new(mysql_pool).await.unwrap();
Expand Down
3 changes: 1 addition & 2 deletions examples/odbc_sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ async fn main() {
// Create SQLite ODBC connection pool
let params = to_secret_map(HashMap::from([(
"connection_string".to_owned(),
"driver=SQLite3;database=examples/sqlite_example.db;"
.to_owned(),
"driver=SQLite3;database=examples/sqlite_example.db;".to_owned(),
)]));
let odbc_pool =
Arc::new(ODBCPool::new(params).expect("unable to create SQLite ODBC connection pool"));
Expand Down
16 changes: 8 additions & 8 deletions examples/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::sync::Arc;
use std::collections::HashMap;
use datafusion::prelude::SessionContext;
use datafusion::sql::TableReference;
use datafusion_table_providers::{
common::DatabaseCatalogProvider,
sql::db_connection_pool::postgrespool::PostgresConnectionPool,
postgres::PostgresTableFactory,
util::secrets::to_secret_map,
common::DatabaseCatalogProvider, postgres::PostgresTableFactory,
sql::db_connection_pool::postgrespool::PostgresConnectionPool, util::secrets::to_secret_map,
};
use std::collections::HashMap;
use std::sync::Arc;

/// This example demonstrates how to:
/// 1. Create a PostgreSQL connection pool
Expand Down Expand Up @@ -54,10 +52,12 @@ async fn main() {
// Create PostgreSQL table provider factory
// Used to generate TableProvider instances that can read PostgreSQL table data
let table_factory = PostgresTableFactory::new(postgres_pool.clone());

// Create database catalog provider
// This allows us to access tables through catalog structure (catalog.schema.table)
let catalog = DatabaseCatalogProvider::try_new(postgres_pool).await.unwrap();
let catalog = DatabaseCatalogProvider::try_new(postgres_pool)
.await
.unwrap();

// Create DataFusion session context
let ctx = SessionContext::new();
Expand Down
2 changes: 1 addition & 1 deletion examples/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() {
// Create SQLite table provider factory
// Used to generate TableProvider instances that can read SQLite table data
let table_factory = SqliteTableFactory::new(sqlite_pool.clone());

// Create database catalog provider
// This allows us to access tables through catalog structure (catalog.schema.table)
let catalog_provider = DatabaseCatalogProvider::try_new(sqlite_pool).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use crate::util::{
indexes::IndexType,
on_conflict::{self, OnConflict},
};
use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef};
use datafusion::{
catalog::{Session, TableProviderFactory},
common::Constraints,
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/creator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::sql::arrow_sql_gen::statement::IndexBuilder;
use crate::sql::db_connection_pool::duckdbpool::DuckDbConnectionPool;
use arrow::{array::RecordBatch, datatypes::SchemaRef};
use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef};
use datafusion::common::Constraints;
use duckdb::{vtab::arrow_recordbatch_to_query_params, ToSql, Transaction};
use snafu::prelude::*;
Expand Down Expand Up @@ -322,7 +322,7 @@ pub(crate) mod tests {
use crate::sql::db_connection_pool::{
dbconnection::duckdbconn::DuckDbConnection, duckdbpool::DuckDbConnectionPool,
};
use arrow::array::RecordBatch;
use datafusion::arrow::array::RecordBatch;
use datafusion::{
execution::{SendableRecordBatchStream, TaskContext},
parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
Expand Down Expand Up @@ -358,7 +358,7 @@ pub(crate) mod tests {
.expect("to build parquet reader");

parquet_reader
.collect::<Result<Vec<_>, arrow::error::ArrowError>>()
.collect::<Result<Vec<_>, datafusion::arrow::error::ArrowError>>()
.expect("to get records")
}

Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/federation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::sql::db_connection_pool::dbconnection::{get_schema, Error as DbError};
use crate::sql::sql_provider_datafusion::{get_stream, to_execution_error};
use arrow::datatypes::SchemaRef;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::sql::unparser::dialect::Dialect;
use datafusion_federation::sql::{SQLExecutor, SQLFederationProvider, SQLTableSource};
use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::util::{
on_conflict::OnConflict,
retriable_error::{check_and_mark_retriable_error, to_retriable_data_write_error},
};
use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef};
use datafusion::catalog::Session;
use datafusion::common::Constraints;
use datafusion::logical_expr::dml::InsertOp;
Expand Down
Loading

0 comments on commit 00a2af6

Please sign in to comment.