forked from nlnwa/corpus-build
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
240 lines (208 loc) · 8.09 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
from argparse import ArgumentParser
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Generator
from nb_tokenizer import tokenize
from psycopg2 import connect
from psycopg2.extensions import cursor
from tqdm import tqdm
from yaml import SafeLoader, dump, load
import jsonlines
import sqlite3
import os
import sys
import uuid
@dataclass
class _DHlabIdStatus:
is_disabled: bool
starting_value: int
@dataclass
class _DatabaseArgs:
hostname: str
port: int
database: str
user: str
password: str
@dataclass
class _FulltextMetadata:
record_id: str
warcpath: Path
hash: str
uri: str
timestamp: str
@dataclass
class Args:
filter_yaml_file: Path
output_dir: Path
dhlab_id_status: _DHlabIdStatus
database: _DatabaseArgs
def _args() -> Args:
parser = ArgumentParser()
parser.add_argument(
"--filter-yaml-file",
type=Path,
required=True,
help="Path to the filter yaml file",
)
parser.add_argument(
"--hostname", type=str, required=True, help="Hostname of the database"
)
parser.add_argument("--port", type=int, required=True, help="Port of the database")
parser.add_argument("--database", type=str, required=True, help="Database name")
parser.add_argument(
"--user", type=str, required=True, help="Username of the database"
)
parser.add_argument(
"--password", type=str, required=True, help="Password of the database"
)
parser.add_argument(
"--output-dir",
type=Path,
required=True,
help="Path to the output dir",
)
dhlab_id_group = parser.add_mutually_exclusive_group(required=True)
dhlab_id_group.add_argument(
"--starting-dhlab-id", type=int, help="Starting dhlab id"
)
dhlab_id_group.add_argument(
"--disable-dhlab-id", action="store_true", help="Disable dhlab id"
)
args = parser.parse_args()
return Args(
filter_yaml_file=args.filter_yaml_file,
output_dir=args.output_dir,
database=_DatabaseArgs(
hostname=args.hostname,
port=args.port,
database=args.database,
user=args.user,
password=args.password,
),
dhlab_id_status=_DHlabIdStatus(
is_disabled=args.disable_dhlab_id,
starting_value=args.starting_dhlab_id if not args.disable_dhlab_id else 0,
),
)
@contextmanager
def _connect_to_database(
hostname: str, port: int, database: str, user: str, password: str
) -> Generator:
connection = connect(
host=hostname, port=port, dbname=database, user=user, password=password
)
database_cursor = connection.cursor()
yield database_cursor
database_cursor.close()
connection.close()
def _remove_duplicates_and_empty_strings(results: list[tuple[str, ...]]) -> list[str]:
filtered_results = []
for entry in results:
for item in entry:
if not item == "":
if item not in filtered_results:
filtered_results.append(item)
return filtered_results
def _fetch_fulltext_with_fulltext_hash(
database_cursor: cursor, fulltext_hash: str
) -> list[str]:
fulltext_table = "fulltext"
database_cursor.execute(
f"SELECT fulltext FROM {fulltext_table} WHERE fulltext_hash = '{fulltext_hash}'"
)
results = database_cursor.fetchall()
return _remove_duplicates_and_empty_strings(results)
def _fetch_fulltext_hash_and_metadata(
database_cursor: cursor, domain: str
) -> list[_FulltextMetadata]:
warcinfo_table = "warcinfo"
database_cursor.execute(
f"SELECT DISTINCT ON (fulltext_hash, target_uri) record_id, wf.warc_file_name AS warcpath, fulltext_hash, target_uri, date FROM {warcinfo_table} w JOIN warc_files wf ON wf.warc_file_id = w.warc_file_id WHERE domain = '{domain}' AND fulltext_hash != 'da39a3ee5e6b4b0d3255bfef95601890afd80709' ORDER BY fulltext_hash, target_uri, date ASC;"
)
all_results = database_cursor.fetchall()
filtered_results = []
for result in all_results:
if len(result) != 5:
raise ValueError(
f"Unexpected number of results, expected 5, got {len(result)}"
)
if result[0] is not None:
if result[0] not in [metadata.hash for metadata in filtered_results]:
parsed_date = datetime.fromisoformat(result[4].replace("Z", "+00:00"))
formatted_date = parsed_date.strftime("%Y%m%d")
res = _FulltextMetadata(
record_id=result[0], warcpath=result[1], hash=result[2], uri=result[3], timestamp=formatted_date
)
filtered_results.append(res)
return filtered_results
def _main() -> None:
args = _args()
args.output_dir.mkdir(parents=True, exist_ok=True)
responsible_editor_key = "have-responsible-editor"
domain_key = "domain"
title_key = "title"
hash_key = 'hash'
geodata_key = "geodata"
if not args.dhlab_id_status.is_disabled:
dhlabid_value = args.dhlab_id_status.starting_value
else:
dhlabid_value = 1
# initiate database
dbid = str(uuid.uuid4())
dbname = f"{args.output_dir}/{dbid}.db"
_create_local_db(dbname)
with _connect_to_database(
hostname=args.database.hostname,
port=args.database.port,
database=args.database.database,
user=args.database.user,
password=args.database.password,
) as database_cursor:
with open(args.filter_yaml_file, "r", encoding="utf-8") as file_pointer:
filter_dict = load(file_pointer, Loader=SafeLoader)
for items in tqdm(filter_dict["publications"]):
try:
print(f"Processing domain {items[domain_key]}", flush=True)
if not items[responsible_editor_key]:
raise ValueError("No responsible editor")
fulltext_metadata_collection = _fetch_fulltext_hash_and_metadata(
database_cursor, items[domain_key]
)
print(
f"Found {len(fulltext_metadata_collection)} documents",
flush=True,
)
for fulltext_metadata in tqdm(fulltext_metadata_collection):
all_tokens_list = []
for full_text in _fetch_fulltext_with_fulltext_hash(
database_cursor=database_cursor,
fulltext_hash=fulltext_metadata.hash,
):
warc_data = {
"record_id": fulltext_metadata.record_id,
"warcpath": fulltext_metadata.warcpath,
"timestamp": fulltext_metadata.timestamp,
"uri": fulltext_metadata.uri,
"full_text": full_text
}
fulltext_dict = {
'dhlabid': dhlabid_value,
hash_key: fulltext_metadata.hash,
title_key: items[title_key],
domain_key: items[domain_key],
responsible_editor_key: items[responsible_editor_key],
'place': items[geodata_key]['place'],
'county': items[geodata_key]['county']
}
fulltext_dict.update(warc_data)
metadata_tuple = tuple(fulltext_dict.values())[:-1]
with jsonlines.open(args.output_dir / f"{items[domain_key]}.yaml", "a") as writer:
writer.write(fulltext_dict)
dhlabid_value += 1
except Exception as e:
print(items[domain_key], "failed with", e)
continue
if __name__ == "__main__":
_main()