Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangb committed Feb 6, 2023
1 parent 7c31f50 commit 92f7060
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 63 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "_pgpq"
version = "0.1.0"
version = "0.1.1"
edition = "2021"
readme = "README.md"
license-file = "LICENSE.txt"
Expand Down
4 changes: 2 additions & 2 deletions bench.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"79.2 µs ± 161 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)\n"
"80.9 µs ± 1.71 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)\n"
]
}
],
Expand All @@ -69,7 +69,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"37.437093019485474\n"
"38.123141288757324\n"
]
}
],
Expand Down
1 change: 0 additions & 1 deletion pgpq/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pgpq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ edition = "2021"
description = ""

[dependencies]
byteorder = "^1.4.3"
bytes = "^1.4.0"
chrono = "^0.4.23"
arrow-schema = "^32.0.0"
Expand All @@ -25,6 +24,9 @@ parquet = ">=32.0.0"
criterion = ">=0.4.0"
arrow = ">=32.0.0"

[profile.bench]
debug = true

[[bench]]
name = "benchmarks"
harness = false
Expand Down
29 changes: 25 additions & 4 deletions pgpq/benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow::datatypes::{DataType, Schema, TimeUnit};
use arrow::record_batch::RecordBatchReader;
use arrow_array::RecordBatch;
use bytes::BytesMut;
use criterion::{criterion_group, criterion_main, Criterion};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use pgpq::ArrowToPostgresBinaryEncoder;
use std::fs;
Expand Down Expand Up @@ -39,19 +39,40 @@ fn bench(input: (Vec<RecordBatch>, Schema)) {

pub fn benchmark_nyc_taxi_small(c: &mut Criterion) {
let mut group = c.benchmark_group("benchmark_nyc_taxi_small");
let (batches, schema) = setup(Some(100));
group.bench_function("NYC Yello Taxi 100 rows", |b| {
b.iter_with_setup(|| setup(Some(100)), bench)
b.iter_with_setup(
|| {
let batches = batches.iter().cloned().collect();
let schema = schema.clone();
(batches, schema)
},
|(batches, schema)| bench(black_box((batches, schema))),
)
});
}

pub fn benchmark_nyc_taxi_full(c: &mut Criterion) {
let mut group = c.benchmark_group("benchmark_nyc_taxi_full");
group.sampling_mode(criterion::SamplingMode::Flat);
group.sample_size(10); // the minimum

let (batches, schema) = setup(Some(100));
group.bench_function("NYC Yello Taxi full", |b| {
b.iter_with_setup(|| setup(None), bench)
b.iter_with_setup(
|| {
let batches = batches.iter().cloned().collect();
let schema = schema.clone();
(batches, schema)
},
|(batches, schema)| bench(black_box((batches, schema))),
)
});
}

criterion_group!(benches, benchmark_nyc_taxi_small, benchmark_nyc_taxi_full);
criterion_group! {
name = benches;
config = Criterion::default();
targets = benchmark_nyc_taxi_small, benchmark_nyc_taxi_full
}
criterion_main!(benches);
20 changes: 17 additions & 3 deletions pgpq/benches/columnar_vs_rows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::fs;
use std::sync::Arc;


fn get_start_end(row: usize, col: usize, n_rows: usize, offsets: &Vec<usize>) -> (usize, usize) {
fn get_start_end(row: usize, col: usize, n_rows: usize, offsets: &[usize]) -> (usize, usize) {
let idx = col * n_rows + col;
(offsets[idx], offsets[idx+1])
}
Expand Down Expand Up @@ -62,7 +62,7 @@ pub fn benchmark_approaches(c: &mut Criterion) {
if !arr.is_null(row) {
let v = arr.value(row);
let (start, end) = get_start_end(row, col, batch.num_rows(), &offsets);
buff[start..end].copy_from_slice(&v.to_ne_bytes());
buff[start..end].copy_from_slice(&v.to_be_bytes());
} else { panic!()};
}
}
Expand All @@ -72,7 +72,7 @@ pub fn benchmark_approaches(c: &mut Criterion) {
if !arr.is_null(row) {
let v = arr.value(row).as_bytes();
let (start, end) = get_start_end(row, col, batch.num_rows(), &offsets);
buff[start..end].copy_from_slice(&v);
buff[start..end].copy_from_slice(v);
} else { panic!()}
}
}
Expand All @@ -86,6 +86,20 @@ pub fn benchmark_approaches(c: &mut Criterion) {
b.iter(|| {
let mut buffer = BytesMut::new();
let cols = batch.columns();
let additional = cols.iter().map(|col| {
match col.data_type() {
DataType::Int64 => {
let arr = col.as_any().downcast_ref::<arrow_array::Int64Array>().unwrap();
8 * arr.len()
}
DataType::Utf8 => {
let arr = col.as_any().downcast_ref::<arrow_array::StringArray>().unwrap();
(arr.value_offsets().last().unwrap() - arr.value_offsets().first().unwrap()) as usize
}
_ => unreachable!()
}
}).sum();
buffer.reserve(additional);
for row in 0..batch.num_rows() {
for col in cols {
match col.data_type() {
Expand Down
50 changes: 13 additions & 37 deletions pgpq/src/arrays.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::any::Any;

use arrow_array;

use arrow_array::{Array, ListArray, FixedSizeListArray, LargeListArray};
use arrow_schema::{DataType, TimeUnit, Field};

Expand Down Expand Up @@ -114,44 +114,20 @@ pub(crate) fn downcast_array<'a>(
DataType::Float16 => ArrowArray::Float16(downcast_checked(arr, field, type_)?),
DataType::Float32 => ArrowArray::Float32(downcast_checked(arr, field, type_)?),
DataType::Float64 => ArrowArray::Float64(downcast_checked(arr, field, type_)?),
DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
ArrowArray::TimestampNanosecond(downcast_checked(arr, field, type_)?, tz.is_some())
}
DataType::Timestamp(TimeUnit::Microsecond, tz) => {
ArrowArray::TimestampMicrosecond(downcast_checked(arr, field, type_)?, tz.is_some())
}
DataType::Timestamp(TimeUnit::Millisecond, tz) => {
ArrowArray::TimestampMillisecond(downcast_checked(arr, field, type_)?, tz.is_some())
}
DataType::Timestamp(TimeUnit::Second, tz) => {
ArrowArray::TimestampSecond(downcast_checked(arr, field, type_)?, tz.is_some())
}
DataType::Timestamp(TimeUnit::Nanosecond, tz) => ArrowArray::TimestampNanosecond(downcast_checked(arr, field, type_)?, tz.is_some()),
DataType::Timestamp(TimeUnit::Microsecond, tz) => ArrowArray::TimestampMicrosecond(downcast_checked(arr, field, type_)?, tz.is_some()),
DataType::Timestamp(TimeUnit::Millisecond, tz) => ArrowArray::TimestampMillisecond(downcast_checked(arr, field, type_)?, tz.is_some()),
DataType::Timestamp(TimeUnit::Second, tz) => ArrowArray::TimestampSecond(downcast_checked(arr, field, type_)?, tz.is_some()),
DataType::Date32 => ArrowArray::Date32(downcast_checked(arr, field, type_)?),
DataType::Date64 => ArrowArray::Date64(downcast_checked(arr, field, type_)?),
DataType::Time32(TimeUnit::Millisecond) => {
ArrowArray::Time32Millisecond(downcast_checked(arr, field, type_)?)
}
DataType::Time32(TimeUnit::Second) => {
ArrowArray::Time32Second(downcast_checked(arr, field, type_)?)
}
DataType::Time64(TimeUnit::Nanosecond) => {
ArrowArray::Time64Nanosecond(downcast_checked(arr, field, type_)?)
}
DataType::Time64(TimeUnit::Microsecond) => {
ArrowArray::Time64Microsecond(downcast_checked(arr, field, type_)?)
}
DataType::Duration(TimeUnit::Nanosecond) => {
ArrowArray::DurationNanosecond(downcast_checked(arr, field, type_)?)
}
DataType::Duration(TimeUnit::Microsecond) => {
ArrowArray::DurationMicrosecond(downcast_checked(arr, field, type_)?)
}
DataType::Duration(TimeUnit::Millisecond) => {
ArrowArray::DurationMillisecond(downcast_checked(arr, field, type_)?)
}
DataType::Duration(TimeUnit::Second) => {
ArrowArray::DurationSecond(downcast_checked(arr, field, type_)?)
}
DataType::Time32(TimeUnit::Millisecond) => ArrowArray::Time32Millisecond(downcast_checked(arr, field, type_)?),
DataType::Time32(TimeUnit::Second) => ArrowArray::Time32Second(downcast_checked(arr, field, type_)?),
DataType::Time64(TimeUnit::Nanosecond) => ArrowArray::Time64Nanosecond(downcast_checked(arr, field, type_)?),
DataType::Time64(TimeUnit::Microsecond) => ArrowArray::Time64Microsecond(downcast_checked(arr, field, type_)?),
DataType::Duration(TimeUnit::Nanosecond) => ArrowArray::DurationNanosecond(downcast_checked(arr, field, type_)?),
DataType::Duration(TimeUnit::Microsecond) => ArrowArray::DurationMicrosecond(downcast_checked(arr, field, type_)?),
DataType::Duration(TimeUnit::Millisecond) => ArrowArray::DurationMillisecond(downcast_checked(arr, field, type_)?),
DataType::Duration(TimeUnit::Second) => ArrowArray::DurationSecond(downcast_checked(arr, field, type_)?),
DataType::Binary => ArrowArray::Binary(downcast_checked(arr, field, type_)?),
DataType::LargeBinary => ArrowArray::LargeBinary(downcast_checked(arr, field, type_)?),
DataType::Utf8 => ArrowArray::String(downcast_checked(arr, field, type_)?),
Expand Down
4 changes: 0 additions & 4 deletions pgpq/src/encoders.rs

This file was deleted.

3 changes: 1 addition & 2 deletions pgpq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::error::Error;
use crate::postgres::{write_null, write_value, PostgresDuration, HEADER_MAGIC_BYTES};
use crate::arrays::{downcast_array, ArrowArray};

use arrow_array;
use arrow_schema::Schema;
use arrow_array::RecordBatch;
use arrow_array::{Array, ArrowPrimitiveType, PrimitiveArray};
Expand Down Expand Up @@ -532,7 +531,7 @@ impl ArrowToPostgresBinaryEncoder {
#[cfg(test)]
mod tests {
use super::*;
use arrow_array;

use arrow_array::{make_array, ArrayRef};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arrow_array::RecordBatchReader;
Expand Down
9 changes: 3 additions & 6 deletions pgpq/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ use postgres_types::{to_sql_checked, IsNull, ToSql, Type};

use crate::error::Error;

use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, BytesMut};

pub const HEADER_MAGIC_BYTES: &[u8] = b"PGCOPY\n\xff\r\n\0";

#[inline]
pub(crate) fn write_null(buf: &mut BytesMut) {
let idx = buf.len();
buf.put_i32(0);
BigEndian::write_i32(&mut buf[idx..], -1);
buf.put_i32(-1);
}

#[inline]
Expand All @@ -22,7 +19,7 @@ pub(crate) fn write_value(
buf: &mut BytesMut,
) -> Result<(), Error> {
let idx = buf.len();
buf.put_i32(0);
buf.put_i32(0); // save space for field length word
let len = match v
.to_sql_checked(type_, buf)
.map_err(|e| Error::to_sql(e, field_name))?
Expand All @@ -33,7 +30,7 @@ pub(crate) fn write_value(
i32::try_from(written).map_err(|_| Error::field_too_large(field_name, written))?
}
};
BigEndian::write_i32(&mut buf[idx..], len);
buf[idx..idx+4].copy_from_slice(&len.to_be_bytes());
Ok(())
}

Expand Down

0 comments on commit 92f7060

Please sign in to comment.