Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GA scraper using nonce #530

Merged
merged 6 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 188 additions & 100 deletions warn/scrapers/ga.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import csv
import logging
import re
import time
import typing
from datetime import datetime
from glob import glob
from pathlib import Path

import requests
from bs4 import BeautifulSoup

from .. import utils
from ..cache import Cache

__authors__ = ["chriszs"]
__authors__ = ["chriszs", "esagara", "Ash1R", "stucka"]
__tags__ = ["html"]
__source__ = {
"name": "Georgia Department of Labor",
Expand All @@ -33,105 +32,194 @@ def scrape(

Returns: the Path where the file is written
"""
state_code = "ga"
cache = Cache(cache_dir)

# The basic configuration for the scrape
base_url = "http://www.dol.state.ga.us/public/es/warn/searchwarns/list"

area = 9 # statewide

current_year = datetime.now().year
first_year = 1989 # first available year

years = list(range(first_year, current_year + 1))
years.reverse()

# Loop through the years and scrape them one by one
output_rows: typing.List = []
for i, year in enumerate(years):
# Concoct the URL
url = f"{base_url}?geoArea={area}&year={year}&step=search"
cache_key = f"{state_code}/{year}.html"

# Read from cache if available and not this year or the year before
if cache.exists(cache_key) and year < current_year - 1:
html = cache.read(cache_key)
else:
# Otherwise, go request it
page = utils.get_url(url, verify=False)
html = page.text
cache.write(cache_key, html)

# Scrape out the table
new_rows = _parse_table(
html,
"emplrList",
include_headers=len(output_rows) == 0 or i == 0, # After the first loop, we can skip the headers
)

# Concatenate the rows
output_rows.extend(new_rows)

# Sleep a bit
time.sleep(2)
base_url = "https://www.tcsg.edu/warn-public-view/"

# Write out the results
data_path = data_dir / f"{state_code}.csv"
utils.write_rows_to_csv(data_path, output_rows)
api_url = "https://www.tcsg.edu/wp-admin/admin-ajax.php"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36"
}

# Return the path to the CSV
return data_path


def _parse_table(html, id, include_headers=True) -> typing.List:
"""
Parse HTML table with given ID.
response = utils.get_url(base_url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, features="html5lib")

Keyword arguments:
html -- the HTML to parse
id -- the ID of the table to parse
include_headers -- whether to include the headers in the output (default True)
script = str(
soup.find(
"script", text=lambda text: text and "window.gvDTglobals.push" in text
)
)

match = re.search(r'"nonce":"([^"]+)"', script)
if match:
nonce = match.group(1)
logger.debug(f"Nonce value found: {nonce}")
else:
logger.debug(
"Nonce value not parsed from page; scrape will likely break momentarily."
)

Returns: a list of rows
"""
# Parse out data table
soup = BeautifulSoup(html, "html5lib")
table_list = soup.find_all(id=id) # output is list-type

# We expect the first table to be there with our data
try:
assert len(table_list) > 0
except AssertionError:
logger.debug("No tables found")
return []
table = table_list[0]

output_rows = []
column_tags = ["td"]

if include_headers:
column_tags.append("th")

# Loop through the table and grab the data
for table_row in table.find_all("tr"):
columns = table_row.find_all(column_tags)
output_row = []

for column in columns:
# Collapse newlines
partial = re.sub(r"\n", " ", column.text)
# Standardize whitespace
clean_text = re.sub(r"\s+", " ", partial).strip()
output_row.append(clean_text)

# Skip any empty rows
if len(output_row) == 0 or output_row == [""]:
continue

output_rows.append(output_row)

return output_rows
payload = {
"draw": 1,
"columns": [
{
"data": 0,
"name": "gv_96",
"searchable": True,
"orderable": True,
"search": {
"value": None,
"regex": False,
},
},
{
"data": 1,
"name": "gv_4",
"searchable": True,
"orderable": True,
"search": {
"value": None,
"regex": False,
},
},
{
"data": 2,
"name": "gv_date_created",
"searchable": True,
"orderable": True,
"search": {
"value": None,
"regex": False,
},
},
{
"data": 3,
"name": "gv_97",
"searchable": True,
"orderable": True,
"search": {
"value": None,
"regex": False,
},
},
],
"order": [{"column": 0, "dir": "asc"}],
"start": 0,
"length": -1,
"search": {
"value": None,
"regex": False,
},
"action": "gv_datatables_data",
"view_id": 77460,
"post_id": 77462,
"nonce": nonce,
"getData": [],
"hideUntilSearched": 0,
"setUrlOnSearch": True,
"shortcode_atts": {"id": 77460, "class": None, "detail": None},
}
response = requests.post(api_url, data=payload, headers=headers)

# Use JSON as an index to get other data files
data = response.json()["data"]
logger.debug(f"{len(data):,} records from newer dataset in index.")

# Download detailed data if not already cached
for listing in data:
filehref = BeautifulSoup(listing[0], features="html5lib")("a")[0]["href"]
fileid = BeautifulSoup(listing[0], features="html5lib")("a")[0].contents[0]
targetfilename = cache_dir / ("ga/" + fileid + ".format3")
utils.fetch_if_not_cached(targetfilename, filehref)

# Parse detailed data
masterlist = []
for filename in glob(f"{cache_dir}/ga/*.format3"):
with open(filename, "r", encoding="utf-8") as infile:
html = infile.read()
tableholder = BeautifulSoup(html, features="html5lib").find(
"table", {"class": "gv-table-view-content"}
)
lastrowname = "Placeholder"
line = {}
for row in tableholder.find_all("tr")[1:]: # Skip header row
if (
row.find_all("table")
or not row.find_all("th")
or not row.find_all("td")
): # Then it's a little sideshow and we don't care.
pass
else:
rowname = row.find("th").text
if not rowname:
rowname = lastrowname + "."
lastrowname = rowname
if (
"Email" not in rowname
and "Submitter Information" not in rowname
and "Acknowledgement" not in rowname
):
rowcontent = row.find("td").text
if "Location Address" in rowname or rowname == "Company Address":
rowguts = (
str(row.find("td"))
.split("<br/><a")[0]
.replace("<td>", "")
.replace("<br/>", ", ")
)
rowcontent = rowguts
line[rowname] = rowcontent
masterlist.append(line)

headermatcher = {
"ID": "GA WARN ID",
"Company Name": "Company Name",
"City": "First Location Address",
"ZIP": "Zip Code",
"County": "County",
"Est. Impact": "Total Number of Affected Employees",
"LWDA": "LWDA", # Not used in newer format
"Separation Date": "First Date of Separation",
}

# Get and process historical data
historicalfilename = cache_dir / ("ga/ga_historical.csv")
filehref = (
"https://storage.googleapis.com/bln-data-public/warn-layoffs/ga_historical.csv"
)
utils.fetch_if_not_cached(historicalfilename, filehref)
with open(historicalfilename, "r", encoding="utf-8") as infile:
reader = list(csv.DictReader(infile))
logger.debug(f"Found {len(reader):,} historical records.")
for row in reader:
line = {}
for item in headermatcher:
line[headermatcher[item]] = row[item]
masterlist.append(line)
logger.debug(f"{len(masterlist):,} records now included.")

masterheaders = []
for row in masterlist:
for key in list(row.keys()):
if key not in masterheaders:
masterheaders.append(key)

output_csv = data_dir / "ga.csv"

# This utils writer function presumes every row has the same format, and ... they don't. So let's standardize.
for i, row in enumerate(masterlist):
line = {}
for item in masterheaders:
if item in row:
line[item] = row[item]
else:
line[item] = None
masterlist[i] = line

utils.write_dict_rows_to_csv(
Path(output_csv), masterheaders, masterlist, mode="w", extrasaction="raise"
)

return output_csv


if __name__ == "__main__":
Expand Down
22 changes: 22 additions & 0 deletions warn/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import typing
from pathlib import Path
from time import sleep

import requests
from retry import retry
Expand Down Expand Up @@ -48,6 +49,27 @@ def create_directory(path: Path, is_file: bool = False):
directory.mkdir(parents=True)


def fetch_if_not_cached(filename, url, **kwargs):
"""Download files if they're not already saved.

Args:
filename: The full filename for the file
url: The URL from which the file may be downloaded.
Notes: Should this even be in utils vs. cache? Should it exist?
"""
create_directory(Path(filename), is_file=True)
if not os.path.exists(filename):
logger.debug(f"Fetching {filename} from {url}")
response = requests.get(url, **kwargs)
if not response.ok:
logger.error(f"Failed to fetch {url} to {filename}")
else:
with open(filename, "wb") as outfile:
outfile.write(response.content)
sleep(2) # Pause between requests
return


def write_rows_to_csv(output_path: Path, rows: list, mode="w"):
"""Write the provided list to the provided path as comma-separated values.

Expand Down