Skip to content

Commit

Permalink
Change approach to emphasize sas_encoding.
Browse files Browse the repository at this point in the history
 - encoding argument may not be necessary if SAS is running UTF-8 and PostgreSQL is doing the same.
  • Loading branch information
iangow committed Jan 15, 2024
1 parent 65463ef commit 0f881f3
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 26 deletions.
148 changes: 148 additions & 0 deletions wrds2pg/test_encoding.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"id": "7acda8e4-ce19-4282-a579-6d90e5532ec5",
"metadata": {},
"outputs": [],
"source": [
"from wrds2pg import wrds_update, proc_contents, get_wrds_process"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "093ca05d-4340-4475-9da8-1f84f9e6bc4d",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Forcing update based on user request.\n",
"Beginning file import at 2024-01-15 14:57:52 UTC.\n",
"Importing data into risk.directors.\n",
"Completed file import at 2024-01-15 14:58:07 UTC.\n",
"\n"
]
},
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wrds_update(\"directors\", \"risk\",\n",
" force=True,\n",
" sas_encoding=\"wlatin1\",\n",
" col_types={'annrev': 'float8',\n",
" 'year_term_ends': 'float8',\n",
" 'voting': 'float8',\n",
" 'votecref':'float8',\n",
" 'outside_public_boards': 'text'})"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "a64a74fc-d004-4437-9520-568abc76beb0",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Forcing update based on user request.\n",
"Beginning file import at 2024-01-15 14:59:05 UTC.\n",
"Importing data into tfn.idfhist.\n",
"Completed file import at 2024-01-15 15:01:25 UTC.\n",
"\n"
]
}
],
"source": [
"wrds_update(\"idfhist\", \"tfn\", \n",
" force=True, \n",
" sas_encoding=\"wlatin1\")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "be3ee283-a53c-4c2e-a756-a0781fe0d22a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"tfn.table1_test already up to date.\n"
]
}
],
"source": [
"wrds_update(\"table1\", \"tfn\",\n",
" alt_table_name=\"table1_test\",\n",
" sas_encoding=\"wlatin1\")"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "45c3c121-74ee-4947-8275-fb581178860b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Forcing update based on user request.\n",
"Beginning file import at 2024-01-15 15:36:10 UTC.\n",
"Importing data into tfn.table2.\n",
"Completed file import at 2024-01-15 15:45:04 UTC.\n",
"\n"
]
}
],
"source": [
"updated = wrds_update(\"table2\", \"tfn\", sas_encoding=\"wlatin1\", force=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "de475208-f64b-440a-b1a6-8d969f0e8327",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
63 changes: 37 additions & 26 deletions wrds2pg/wrds2pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def get_wrds_sas(table_name, schema, wrds_id=None, fpath=None, rpath=None,
wrds_id=wrds_id,
fpath=fpath, rpath=rpath,
drop=drop, rename=rename, keep=keep)

col_types = make_table_data["col_types"]

if fix_cr:
Expand Down Expand Up @@ -305,7 +305,7 @@ def get_wrds_sas(table_name, schema, wrds_id=None, fpath=None, rpath=None,
if not sas_encoding:
sas_encoding_str=""
else:
sas_encoding_str=f"(encoding='{sas_encoding}')"
sas_encoding_str=f" encoding='{sas_encoding}'"

if fix_missing or drop or obs or keep or col_types or where:

Expand All @@ -331,7 +331,7 @@ def get_wrds_sas(table_name, schema, wrds_id=None, fpath=None, rpath=None,

if obs or drop or rename or keep:
sas_table = table_name + "(" + drop_str + keep_str + \
obs_str + rename_str + ")"
obs_str + rename_str + sas_encoding_str + ")"
else:
sas_table = table_name

Expand Down Expand Up @@ -381,9 +381,9 @@ def get_wrds_sas(table_name, schema, wrds_id=None, fpath=None, rpath=None,
options timezone='{tz}';
{libname_stmt}
* Fix missing values;
data {new_table};
set {schema}.{sas_table}{sas_encoding_str};
set {schema}.{sas_table};
{ts_str}
{fix_cr_code}
{fix_missing_str}
Expand All @@ -397,7 +397,7 @@ def get_wrds_sas(table_name, schema, wrds_id=None, fpath=None, rpath=None,
{timestamps_str}
run;
proc export data={new_table}(encoding="{encoding}")
proc export data={new_table}({sas_encoding_str})
outfile=stdout dbms=csv;
run;"""
else:
Expand All @@ -407,25 +407,27 @@ def get_wrds_sas(table_name, schema, wrds_id=None, fpath=None, rpath=None,
{libname_stmt}
proc export data={schema}.{table_name}({rename_str}
encoding="{encoding}") outfile=stdout dbms=csv;
encoding="{sas_encoding_str}") outfile=stdout dbms=csv;
run;"""
return sas_code

def get_wrds_process(table_name, schema, wrds_id=None, fpath=None, rpath=None,
drop=None, keep=None, fix_cr = False,
fix_missing = False, obs=None, rename=None, where=None,
encoding=None, sas_encoding=None,
tz="UTC", ts_strs=None, gzip=False):
tz="UTC", ts_strs=None, gzip=False, return_code=False):
sas_code = get_wrds_sas(table_name=table_name, wrds_id=wrds_id,
rpath=rpath, fpath=fpath, schema=schema,
drop=drop, rename=rename, keep=keep,
fix_cr=fix_cr, fix_missing=fix_missing,
obs=obs, where=where,
encoding=encoding, sas_encoding=sas_encoding,
tz=tz, ts_strs=ts_strs)

p = get_process(sas_code, wrds_id=wrds_id, fpath=fpath, gzip=gzip)
return(p)
if return_code:
return sas_code
else:
p = get_process(sas_code, wrds_id=wrds_id, fpath=fpath, gzip=gzip)
return(p)

def wrds_to_pandas(table_name, schema, wrds_id, rename=None,
drop=None, obs=None, encoding=None, fpath=None, rpath=None,
Expand All @@ -448,29 +450,34 @@ def wrds_to_pandas(table_name, schema, wrds_id, rename=None,

return(df)

def proc_contents(table_name, sas_schema=None, wrds_id=os.getenv("WRDS_ID"), rpath=None, encoding=None):
if not encoding:
encoding = "utf-8"
def proc_contents(table_name, sas_schema=None, wrds_id=os.getenv("WRDS_ID"), rpath=None,
sas_encoding=None):
if not sas_encoding:
sas_encoding = "utf-8"

if not sas_schema:
sas_schema = rpath

sas_code = f"PROC CONTENTS data={sas_schema}.{table_name}(encoding='{encoding}');"
sas_code = f"PROC CONTENTS data={sas_schema}.{table_name}(encoding='{sas_encoding}');"

p = get_process(sas_code, wrds_id)

return p.readlines()

def get_modified_str(table_name, sas_schema, wrds_id=wrds_id,
encoding=None, rpath=None):

contents = proc_contents(table_name, sas_schema, wrds_id, rpath, encoding)
contents = p.readlines()
if len(contents) == 0:
print(f"Table {sas_schema}.{table_name} not found.")
return None
else:
return contents

def get_modified_str(table_name, sas_schema, wrds_id=wrds_id,
sas_encoding=None, rpath=None):

contents = proc_contents(table_name, sas_schema, wrds_id, rpath,
sas_encoding=sas_encoding)
modified = ""
next_row = False
if not contents:
return None

for line in contents:
if next_row:
line = re.sub(r"^\s+(.*)\s+$", r"\1", line)
Expand Down Expand Up @@ -561,12 +568,14 @@ def wrds_to_pg(table_name, schema, engine, wrds_id=None,
sas_encoding=sas_encoding, tz=tz)

res = wrds_process_to_pg(alt_table_name, schema, engine, p,
encoding=encoding,
tz=tz)
print(f"Completed file import at {get_now()} UTC.\n")

return res

def wrds_process_to_pg(table_name, schema, engine, p, encoding=None, tz='UTC'):
def wrds_process_to_pg(table_name, schema, engine, p, encoding=None,
tz='UTC'):

if not encoding:
encoding = "UTF8"
Expand Down Expand Up @@ -746,7 +755,8 @@ def wrds_update(table_name, schema,

# 2. Get modified date from WRDS
modified = get_modified_str(table_name, sas_schema, wrds_id,
encoding=encoding, rpath=rpath)
sas_encoding=sas_encoding,
rpath=rpath)
if not modified:
return False
else:
Expand Down Expand Up @@ -1036,7 +1046,7 @@ def wrds_update_pq(table_name, schema,

modified = get_modified_str(table_name=table_name,
sas_schema=sas_schema, wrds_id=wrds_id,
encoding=encoding)
sas_encoding=sas_encoding)
if not modified:
return False

Expand Down Expand Up @@ -1339,7 +1349,8 @@ def wrds_update_csv(table_name, schema,
os.makedirs(schema_dir)

csv_file = Path(data_dir, schema, alt_table_name).with_suffix('.csv.gz')
modified = get_modified_str(table_name, sas_schema, wrds_id)
modified = get_modified_str(table_name, sas_schema, wrds_id,
sas_encoding=sas_encoding)
if not modified:
return False

Expand Down

0 comments on commit 0f881f3

Please sign in to comment.