Skip to content

Commit e32fa47

Browse files
committed
[cdd_gae/parquet_to_table.py] Using binary COPY FROM via pgcopy (WiP my contributions to that package) ; [requirements.txt] Add pgcopy ; [cdd_gae/tests/test_parquet_to_table.py] Improve whitespace testing ;
1 parent 406922a commit e32fa47

File tree

3 files changed

+31
-48
lines changed

3 files changed

+31
-48
lines changed

cdd_gae/parquet_to_table.py

+17-40
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
from collections import deque
66
from datetime import datetime
77
from functools import partial
8-
from io import StringIO
98
from json import dumps, loads
109
from operator import methodcaller
1110
from os import environ, path
1211

1312
import numpy as np
1413
import psycopg2.sql
15-
from cdd.shared.pure_utils import identity, pp
14+
from cdd.shared.pure_utils import identity
15+
from pgcopy import CopyManager
1616
from pyarrow.parquet import ParquetFile
1717
from sqlalchemy import create_engine
1818

@@ -70,6 +70,10 @@ def parse_col(col):
7070
raise NotImplementedError(type(col))
7171

7272

73+
# {"\\"first item\\"","\\"second item\\"",
74+
# "\\"third item with \\\\\\"quotes\\\\\\" inside\\"","\\"fourth with a \\\\\\\\ backslash\\""}
75+
76+
7377
def maybe_quote_and_escape(s, ch='"'):
7478
if isinstance(s, str) and s.startswith("{") and s.endswith("}"):
7579
return "{ch}{}{ch}".format(s.replace('"', '\\"'), ch=ch)
@@ -88,44 +92,17 @@ def psql_insert_copy(table, conn, keys, data_iter):
8892
Column names
8993
data_iter : Iterable that iterates the values to be inserted
9094
"""
91-
# gets a DBAPI connection that can provide a cursor
92-
dbapi_conn = conn.connection
93-
with dbapi_conn.cursor() as cur:
94-
s_buf = StringIO()
95-
s_buf.writelines(
96-
"\n".join(
97-
map(lambda line: "|".join(map(str, map(parse_col, line))), data_iter)
98-
)
99-
)
100-
s_buf.seek(0)
101-
102-
sql = "COPY {} ({}) FROM STDIN WITH null as 'null' DELIMITER '|'".format(
103-
psycopg2.sql.Identifier(
104-
*(table.schema, table.name) if table.schema else (table.name,)
105-
).as_string(cur),
106-
psycopg2.sql.SQL(", ")
107-
.join(
108-
map(
109-
psycopg2.sql.Identifier,
110-
keys[1:] if keys and keys[0] == "index" else keys,
111-
)
112-
)
113-
.as_string(cur),
114-
)
115-
try:
116-
cur.copy_expert(sql=sql, file=s_buf)
117-
except:
118-
print(sql)
119-
s_buf.seek(0)
120-
pp(
121-
dict(
122-
zip(
123-
keys[1:] if keys and keys[0] == "index" else keys,
124-
next(s_buf).split("|"),
125-
)
126-
)
127-
)
128-
raise
95+
96+
with conn.connection.cursor() as cur:
97+
table_name = psycopg2.sql.Identifier(
98+
*(table.schema, table.name) if table.schema else (table.name,)
99+
).as_string(cur)
100+
101+
mgr = CopyManager(
102+
conn.connection, table.name, keys[1:] if keys and keys[0] == "index" else keys
103+
)
104+
mgr.copy(data_iter)
105+
conn.connection.commit()
129106

130107

131108
def csv_to_postgres_text(lines):

cdd_gae/tests/test_parquet_to_table.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import pyarrow as pa
1818
import pyarrow.parquet as pq
1919
import sqlalchemy
20+
from cdd.shared.pure_utils import rpartial
2021
from cdd.tests.utils_for_tests import unittest_main
2122
from psycopg2.extensions import AsIs, register_adapter
2223
from sqlalchemy import (
@@ -121,12 +122,15 @@ class TestParquetToTable(TestCase):
121122
) # python: `pd.read_sql_query("SELECT * FROM").to_csv`
122123

123124
def test_csv_to_postgres_text(self):
124-
self.assertEqual(
125-
self.copy_to_stdout_mock, csv_to_postgres_text(self.to_csv_mock)
125+
self.assertListEqual(
126+
*map(
127+
rpartial(str.split, "\n"),
128+
(self.copy_to_stdout_mock, csv_to_postgres_text(self.to_csv_mock)),
129+
)
126130
)
127131

128132
@unittest.skipUnless(
129-
"RDBMS_URI" in environ, "RDMBS_URI env var must be set to for this test to run"
133+
"RDBMS_URI" in environ, "RDMBS_URI env var must be set for this test to run"
130134
)
131135
def test_sqlalchemy_csv(self) -> None:
132136
"""Reverse-engineer SQLalchemy's handling of complex types for cdd_gae's implementation"""
@@ -165,8 +169,9 @@ class Tbl:
165169
session.commit()
166170
self.assertEqual(Query([Tbl], session=session).count(), 3)
167171
df = pd.read_sql_query("SELECT * FROM {}".format(table_name), engine)
172+
# session.execute("TRUNCATE {}".format(table_name))
168173

169-
results = df.to_csv(index=False, sep="\t")
174+
results = df.to_csv(index=False, sep="\t", header=False)
170175
self.assertEqual(
171176
results,
172177
"timestamp_col\tjson_col\tarray_str_col\tarray_bigint_col\tarray_json_col\tid\n{0}".format(
@@ -185,7 +190,7 @@ class Tbl:
185190
metadata.drop_all(tables=[table], bind=engine)
186191

187192
@unittest.skipUnless(
188-
"RDBMS_URI" in environ, "RDMBS_URI env var must be set to for this test to run"
193+
"RDBMS_URI" in environ, "RDMBS_URI env var must be set for this test to run"
189194
)
190195
@with_own_table
191196
def test_parquet_to_table(self) -> None:

requirements.txt

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# Note: no dependencies are required on Python >=3.9; `black` is optional; `pyyaml` is mostly optional
2-
https://github.com/offscale/astor/archive/refs/heads/empty-annassign.zip
2+
#python-cdd
33
black
4+
https://api.github.com/repos/offscale/cdd-python/zipball#egg=python_cdd
5+
https://github.com/offscale/astor/archive/refs/heads/empty-annassign.zip
6+
pgcopy
47
psycopg
58
pyarrow
6-
#python-cdd
7-
https://api.github.com/repos/offscale/cdd-python/zipball#egg=python_cdd
89
pyyaml
910
sqlalchemy
1011
typing-extensions

0 commit comments

Comments
 (0)