Skip to content

Commit

Permalink
Switched to Psycopg 3.
Browse files Browse the repository at this point in the history
 - Also added get_type_dict().
  • Loading branch information
iangow committed Jan 8, 2024
1 parent 165568d commit d1d0cd0
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@

setuptools.setup(
name="wrds2pg",
version="1.0.29",
version="1.0.30",
author="Ian Gow",
author_email="[email protected]",
description="Convert WRDS or local SAS data to PostgreSQL, parquet, or CSV.",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/iangow/wrds2pg/",
packages=setuptools.find_packages(),
install_requires=['pandas', 'sqlalchemy', 'paramiko', 'psycopg', 'pyarrow', 'duckdb'],
install_requires=['pandas', 'sqlalchemy>=2.0.0', 'paramiko', 'psycopg', 'pyarrow', 'duckdb'],
python_requires=">=3",
classifiers=[
"Programming Language :: Python :: 3",
Expand Down
1 change: 1 addition & 0 deletions wrds2pg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
from wrds2pg.wrds2pg import wrds_id, set_table_comment, get_table_sql
from wrds2pg.wrds2pg import wrds_update_pq, wrds_csv_to_pq, wrds_to_csv
from wrds2pg.wrds2pg import csv_to_pq, wrds_update_csv
from wrds2pg.wrds2pg import get_type_dict
32 changes: 20 additions & 12 deletions wrds2pg/wrds2pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re, subprocess, os, paramiko
from time import gmtime, strftime
from sqlalchemy import create_engine, inspect
from sqlalchemy import text
from sqlalchemy import text, MetaData, Table
import duckdb
from pathlib import Path
import shutil
Expand All @@ -16,12 +16,11 @@
from zoneinfo import ZoneInfo
import os
import time

from sqlalchemy.engine import reflection
from os import getenv
from sqlalchemy.dialects.postgresql import dialect

client = paramiko.SSHClient()
wrds_id = getenv("WRDS_ID")
wrds_id = os.getenv("WRDS_ID")

import warnings
warnings.filterwarnings(action='ignore', module='.*paramiko.*')
Expand Down Expand Up @@ -554,7 +553,9 @@ def wrds_process_to_pg(table_name, schema, engine, p, encoding=None):
try:
with connection_fairy.cursor() as curs:
curs.execute("SET DateStyle TO 'ISO, MDY'")
curs.copy_expert(copy_cmd, p)
with curs.copy(copy_cmd) as copy:
while data := p.read():
copy.write(data)
curs.close()
finally:
connection_fairy.commit()
Expand Down Expand Up @@ -693,7 +694,7 @@ def wrds_update(table_name, schema,
print("Error: Missing connection variables. Please specify engine or (host, dbname).")
quit()
else:
engine = create_engine("postgresql://" + host + "/" + dbname)
engine = create_engine("postgresql+psycopg://" + host + "/" + dbname)
if wrds_id:
# 1. Get comments from PostgreSQL database
comment = get_table_comment(alt_table_name, schema, engine)
Expand Down Expand Up @@ -775,13 +776,13 @@ def run_file_sql(file, engine):

def make_engine(host=None, dbname=None, wrds_id=None):
if not dbname:
dbname = getenv("PGDATABASE")
dbname = os.getenv("PGDATABASE")
if not host:
host = getenv("PGHOST", "localhost")
host = os.getenv("PGHOST", "localhost")
if not wrds_id:
wrds_id = getenv("WRDS_ID")
wrds_id = os.getenv("WRDS_ID")

engine = create_engine("postgresql://" + host + "/" + dbname)
engine = create_engine("postgresql+psycopg://" + host + "/" + dbname)
return engine

def role_exists(engine, role):
Expand All @@ -800,9 +801,9 @@ def get_wrds_tables(schema, wrds_id=None):
from sqlalchemy import MetaData

if not wrds_id:
wrds_id = getenv("WRDS_ID")
wrds_id = os.getenv("WRDS_ID")

wrds_engine = create_engine("postgresql://%[email protected]:9737/wrds" % wrds_id,
wrds_engine = create_engine("postgresql+psycopg://%[email protected]:9737/wrds" % wrds_id,
connect_args = {'sslmode':'require'})

metadata = MetaData(wrds_engine, schema=schema)
Expand Down Expand Up @@ -1245,3 +1246,10 @@ def wrds_update_csv(table_name, schema,
now = strftime("%H:%M:%S", gmtime())
print("Completed file download at %s." % now)
return True

def get_type_dict(table, schema, engine):
pgDialect = dialect()
metadata_obj = MetaData()
table_data = Table(table, metadata_obj, schema=schema, autoload_with=engine)
return {i.name: i.type.compile(dialect=pgDialect)
for i in table_data.c}

0 comments on commit d1d0cd0

Please sign in to comment.