forked from whchien/funda-scraper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scrape.py
520 lines (463 loc) · 19.3 KB
/
scrape.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
"""Main funda scraper module"""
import argparse
import datetime
import json
import multiprocessing as mp
import os
from collections import OrderedDict
from typing import List, Optional
from urllib.parse import urlparse, urlunparse
import pandas as pd
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
from tqdm.contrib.concurrent import process_map
from funda_scraper.config.core import config
from funda_scraper.preprocess import clean_date_format, preprocess_data
from funda_scraper.utils import logger
class FundaScraper(object):
"""
A class used to scrape real estate data from the Funda website.
"""
def __init__(
self,
area: str,
want_to: str,
page_start: int = 1,
n_pages: int = 1,
find_past: bool = False,
min_price: Optional[int] = None,
max_price: Optional[int] = None,
days_since: Optional[int] = None,
property_type: Optional[str] = None,
min_floor_area: Optional[str] = None,
max_floor_area: Optional[str] = None,
sort: Optional[str] = None,
days_on_funda: Optional[int] = None,
):
"""
:param area: The area to search for properties, formatted for URL compatibility.
:param want_to: Specifies whether the user wants to buy or rent properties.
:param page_start: The starting page number for the search.
:param n_pages: The number of pages to scrape.
:param find_past: Flag to indicate whether to find past listings.
:param min_price: The minimum price for the property search.
:param max_price: The maximum price for the property search.
:param days_since: The maximum number of days since the listing was published.
:param property_type: The type of property to search for.
:param min_floor_area: The minimum floor area for the property search.
:param max_floor_area: The maximum floor area for the property search.
:param sort: The sorting criterion for the search results.
:param days_on_funda: The number of days already on funda.
"""
# Init attributes
self.area = area.lower().replace(" ", "-")
self.property_type = property_type
self.want_to = want_to
self.find_past = find_past
self.page_start = max(page_start, 1)
self.n_pages = max(n_pages, 1)
self.page_end = self.page_start + self.n_pages - 1
self.min_price = min_price
self.max_price = max_price
self.days_since = days_since
self.min_floor_area = min_floor_area
self.max_floor_area = max_floor_area
self.sort = sort
self.days_on_funda = days_on_funda
# Instantiate along the way
self.links: List[str] = []
self.raw_df = pd.DataFrame()
self.clean_df = pd.DataFrame()
self.base_url = config.base_url
self.selectors = config.css_selector
def __repr__(self):
return (
f"FundaScraper(area={self.area}, "
f"want_to={self.want_to}, "
f"n_pages={self.n_pages}, "
f"page_start={self.page_start}, "
f"find_past={self.find_past}, "
f"min_price={self.min_price}, "
f"max_price={self.max_price}, "
f"days_since={self.days_since}, "
f"min_floor_area={self.min_floor_area}, "
f"max_floor_area={self.max_floor_area}, "
f"find_past={self.find_past})"
f"min_price={self.min_price})"
f"max_price={self.max_price})"
f"days_since={self.days_since})"
f"sort={self.sort})"
f"days_on_funda={self.days_on_funda})"
)
@property
def to_buy(self) -> bool:
"""Determines if the search is for buying or renting properties."""
if self.want_to.lower() in ["buy", "koop", "b", "k"]:
return True
elif self.want_to.lower() in ["rent", "huur", "r", "h"]:
return False
else:
raise ValueError("'want_to' must be either 'buy' or 'rent'.")
@property
def check_days_since(self) -> int:
"""Validates the 'days_since' attribute."""
if self.find_past:
raise ValueError("'days_since' can only be specified when find_past=False.")
if self.days_since in [None, 1, 3, 5, 10, 30]:
return self.days_since
else:
raise ValueError("'days_since' must be either None, 1, 3, 5, 10 or 30.")
@property
def check_sort(self) -> str:
"""Validates the 'sort' attribute."""
if self.sort in [
None,
"relevancy",
"date_down",
"date_up",
"price_up",
"price_down",
"floor_area_down",
"plot_area_down",
"city_up" "postal_code_up",
]:
return self.sort
else:
raise ValueError(
"'sort' must be either None, 'relevancy', 'date_down', 'date_up', 'price_up', 'price_down', "
"'floor_area_down', 'plot_area_down', 'city_up' or 'postal_code_up'. "
)
@staticmethod
def _check_dir() -> None:
"""Ensures the existence of the directory for storing data."""
if not os.path.exists("data"):
os.makedirs("data")
@staticmethod
def _get_links_from_one_parent(url: str) -> List[str]:
"""Scrapes all available property links from a single Funda search page."""
response = requests.get(url, headers=config.header)
soup = BeautifulSoup(response.text, "lxml")
script_tag = soup.find_all("script", {"type": "application/ld+json"})[0]
json_data = json.loads(script_tag.contents[0])
urls = [item["url"] for item in json_data["itemListElement"]]
return urls
def reset(
self,
area: Optional[str] = None,
property_type: Optional[str] = None,
want_to: Optional[str] = None,
page_start: Optional[int] = None,
n_pages: Optional[int] = None,
find_past: Optional[bool] = None,
min_price: Optional[int] = None,
max_price: Optional[int] = None,
days_since: Optional[int] = None,
min_floor_area: Optional[str] = None,
max_floor_area: Optional[str] = None,
sort: Optional[str] = None,
days_on_funda: Optional[int] = None,
) -> None:
"""Resets or initializes the search parameters."""
if area is not None:
self.area = area
if property_type is not None:
self.property_type = property_type
if want_to is not None:
self.want_to = want_to
if page_start is not None:
self.page_start = max(page_start, 1)
if n_pages is not None:
self.n_pages = max(n_pages, 1)
if find_past is not None:
self.find_past = find_past
if min_price is not None:
self.min_price = min_price
if max_price is not None:
self.max_price = max_price
if days_since is not None:
self.days_since = days_since
if min_floor_area is not None:
self.min_floor_area = min_floor_area
if max_floor_area is not None:
self.max_floor_area = max_floor_area
if sort is not None:
self.sort = sort
if days_on_funda is not None:
self.days_on_funda = days_on_funda
@staticmethod
def remove_duplicates(lst: List[str]) -> List[str]:
"""Removes duplicate links from a list."""
return list(OrderedDict.fromkeys(lst))
@staticmethod
def fix_link(link: str) -> str:
"""Fixes a given property link to ensure proper URL formatting."""
link_url = urlparse(link)
link_path = link_url.path.split("/")
property_id = link_path.pop(5)
property_address = link_path.pop(4)
property_city = link_path.pop(3)
link_path = link_path[2:3]
link_path.extend([property_city, property_address, property_id, "?old_ldp=true"])
fixed_link = urlunparse(
(link_url.scheme, link_url.netloc+"/detail", "/".join(link_path), "", "", "")
)
return fixed_link
def fetch_all_links(self, page_start: int = None, n_pages: int = None) -> None:
"""Collects all available property links across multiple pages."""
page_start = self.page_start if page_start is None else page_start
n_pages = self.n_pages if n_pages is None else n_pages
logger.info("*** Phase 1: Fetch all the available links from all pages *** ")
urls = []
main_url = self._build_main_query_url()
for i in tqdm(range(page_start, page_start + n_pages)):
try:
item_list = self._get_links_from_one_parent(
f"{main_url}&search_result={i}"
)
urls += item_list
except IndexError:
self.page_end = i
logger.info(f"*** The last available page is {self.page_end} ***")
break
urls = self.remove_duplicates(urls)
fixed_urls = [self.fix_link(url) for url in urls]
logger.info(
f"*** Got all the urls. {len(fixed_urls)} houses found from {self.page_start} to {self.page_end} ***"
)
self.links = fixed_urls
def _build_main_query_url(self) -> str:
"""Constructs the main query URL for the search."""
query = "koop" if self.to_buy else "huur"
main_url = (
f"{self.base_url}/zoeken/{query}?selected_area=%5B%22{self.area}%22%5D"
)
if self.property_type:
property_types = self.property_type.split(",")
formatted_property_types = [
"%22" + prop_type + "%22" for prop_type in property_types
]
main_url += f"&object_type=%5B{','.join(formatted_property_types)}%5D"
if self.find_past:
main_url = f'{main_url}&availability=%5B"unavailable"%5D'
if self.min_price is not None or self.max_price is not None:
min_price = "" if self.min_price is None else self.min_price
max_price = "" if self.max_price is None else self.max_price
main_url = f"{main_url}&price=%22{min_price}-{max_price}%22"
if self.days_since is not None:
main_url = f"{main_url}&publication_date={self.check_days_since}"
if self.min_floor_area or self.max_floor_area:
min_floor_area = "" if self.min_floor_area is None else self.min_floor_area
max_floor_area = "" if self.max_floor_area is None else self.max_floor_area
main_url = f"{main_url}&floor_area=%22{min_floor_area}-{max_floor_area}%22"
if self.sort is not None:
main_url = f"{main_url}&sort=%22{self.check_sort}%22"
if self.days_on_funda is not None:
days_on_funda = self.days_on_funda
main_url = f"{main_url}&publication_date=%22{days_on_funda}%22"
logger.info(f"*** Main URL: {main_url} ***")
return main_url
@staticmethod
def get_value_from_css(soup: BeautifulSoup, selector: str) -> str:
"""Extracts data from HTML using a CSS selector."""
result = soup.select(selector)
if len(result) > 0:
result = result[0].text
else:
result = "na"
return result
def scrape_one_link(self, link: str) -> List[str]:
"""Scrapes data from a single property link."""
# Initialize for each page
response = requests.get(link, headers=config.header)
soup = BeautifulSoup(response.text, "lxml")
# Get the value according to respective CSS selectors
if self.to_buy:
if self.find_past:
list_since_selector = self.selectors.date_list
else:
list_since_selector = self.selectors.listed_since
else:
if self.find_past:
list_since_selector = ".fd-align-items-center:nth-child(9) span"
else:
list_since_selector = ".fd-align-items-center:nth-child(7) span"
result = [
link,
self.get_value_from_css(soup, self.selectors.price),
self.get_value_from_css(soup, self.selectors.address),
self.get_value_from_css(soup, self.selectors.descrip),
self.get_value_from_css(soup, list_since_selector),
self.get_value_from_css(soup, self.selectors.zip_code),
self.get_value_from_css(soup, self.selectors.size),
self.get_value_from_css(soup, self.selectors.year),
self.get_value_from_css(soup, self.selectors.living_area),
self.get_value_from_css(soup, self.selectors.kind_of_house),
self.get_value_from_css(soup, self.selectors.building_type),
self.get_value_from_css(soup, self.selectors.num_of_rooms),
self.get_value_from_css(soup, self.selectors.num_of_bathrooms),
self.get_value_from_css(soup, self.selectors.layout),
self.get_value_from_css(soup, self.selectors.energy_label),
self.get_value_from_css(soup, self.selectors.insulation),
self.get_value_from_css(soup, self.selectors.heating),
self.get_value_from_css(soup, self.selectors.ownership),
self.get_value_from_css(soup, self.selectors.exteriors),
self.get_value_from_css(soup, self.selectors.parking),
self.get_value_from_css(soup, self.selectors.neighborhood_name),
self.get_value_from_css(soup, self.selectors.date_list),
self.get_value_from_css(soup, self.selectors.date_sold),
self.get_value_from_css(soup, self.selectors.term),
self.get_value_from_css(soup, self.selectors.price_sold),
self.get_value_from_css(soup, self.selectors.last_ask_price),
self.get_value_from_css(soup, self.selectors.last_ask_price_m2).split("\r")[
0
],
]
# Deal with list_since_selector especially, since its CSS varies sometimes
if clean_date_format(result[4]) == "na":
for i in range(6, 16):
selector = f".fd-align-items-center:nth-child({i}) span"
update_list_since = self.get_value_from_css(soup, selector)
if clean_date_format(update_list_since) == "na":
pass
else:
result[4] = update_list_since
photos_list = [
p.get("srcset") for p in soup.find_all("img",srcset=True)
]
photos_string = ", ".join(photos_list).replace("\n", "").replace(",,", ",")
# Clean up the retried result from one page
result = [r.replace("\n", "").replace("\r", "").strip() for r in result]
result.append(photos_string)
return result
def scrape_pages(self) -> None:
"""Scrapes data from all collected property links."""
logger.info("*** Phase 2: Start scraping from individual links ***")
df = pd.DataFrame({key: [] for key in self.selectors.keys()})
# Scrape pages with multiprocessing to improve efficiency
# TODO: use asyncio instead
pools = mp.cpu_count()
content = process_map(self.scrape_one_link, self.links, max_workers=pools)
for i, c in enumerate(content):
df.loc[len(df)] = c
df["city"] = df["url"].map(lambda x: x.split("/")[5])
df["log_id"] = datetime.datetime.now().strftime("%Y%m-%d%H-%M%S")
if not self.find_past:
df = df.drop(["term", "price_sold", "date_sold"], axis=1)
logger.info(f"*** All scraping done: {df.shape[0]} results ***")
self.raw_df = df
def save_csv(self, df: pd.DataFrame, filepath: str = None) -> None:
"""Saves the scraped data to a CSV file."""
if filepath is None:
self._check_dir()
date = str(datetime.datetime.now().date()).replace("-", "")
status = "unavailable" if self.find_past else "unavailable"
want_to = "buy" if self.to_buy else "rent"
filepath = f"./data/houseprice_{date}_{self.area}_{want_to}_{status}_{len(self.links)}.csv"
df.to_csv(filepath, index=False)
logger.info(f"*** File saved: {filepath}. ***")
def run(
self, raw_data: bool = False, save: bool = False, filepath: str = None
) -> pd.DataFrame:
"""
Runs the full scraping process, optionally saving the results to a CSV file.
:param raw_data: if true, the data won't be pre-processed
:param save: if true, the data will be saved as a csv file
:param filepath: the name for the file
:return: the (pre-processed) dataframe from scraping
"""
self.fetch_all_links()
self.scrape_pages()
if raw_data:
df = self.raw_df
else:
logger.info("*** Cleaning data ***")
df = preprocess_data(df=self.raw_df, is_past=self.find_past)
self.clean_df = df
if save:
self.save_csv(df, filepath)
logger.info("*** Done! ***")
return df
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--area",
type=str,
help="Specify which area you are looking for",
default="amsterdam",
)
parser.add_argument(
"--want_to",
type=str,
help="Specify you want to 'rent' or 'buy'",
default="rent",
choices=["rent", "buy"],
)
parser.add_argument(
"--find_past",
action="store_true",
help="Indicate whether you want to use historical data",
)
parser.add_argument(
"--page_start", type=int, help="Specify which page to start scraping", default=1
)
parser.add_argument(
"--n_pages", type=int, help="Specify how many pages to scrape", default=1
)
parser.add_argument(
"--min_price", type=int, help="Specify the min price", default=None
)
parser.add_argument(
"--max_price", type=int, help="Specify the max price", default=None
)
parser.add_argument(
"--days_on_funda", type=int, help="Specify the max days on funda", default=None
)
parser.add_argument(
"--days_since",
type=int,
help="Specify the days since publication",
default=None,
)
parser.add_argument(
"--sort",
type=str,
help="Specify sorting",
default=None,
choices=[
None,
"relevancy",
"date_down",
"date_up",
"price_up",
"price_down",
"floor_area_down",
"plot_area_down",
"city_up" "postal_code_up",
],
)
parser.add_argument(
"--raw_data",
action="store_true",
help="Indicate whether you want the raw scraping result",
)
parser.add_argument(
"--save",
action="store_true",
help="Indicate whether you want to save the data",
)
args = parser.parse_args()
scraper = FundaScraper(
area=args.area,
want_to=args.want_to,
find_past=args.find_past,
page_start=args.page_start,
n_pages=args.n_pages,
min_price=args.min_price,
max_price=args.max_price,
days_since=args.days_since,
sort=args.sort,
days_on_funda=args.days_on_funda,
)
df = scraper.run(raw_data=args.raw_data, save=args.save)
print(df.head())