Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes to increase download capacity #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 25 additions & 80 deletions scraping/download_and_extract_scripts/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
import json
import time


#Configure logging for behavior tracking and errors
# Configure logging for behavior tracking and errors
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

#Function for the highest index of papers downloaded for continuation
# Function for the highest index of papers downloaded for continuation
def get_indexes(papers):
if papers:
nums = []
Expand All @@ -23,27 +22,20 @@ def get_indexes(papers):
return sorted(nums)[-1:]
return []

#Function that is capable of downloading PDFs allowing retrial and concurrent downloads
# Function that is capable of downloading PDFs allowing retrial and concurrent downloads
async def download_pdfs(metadata_dict, semaphore, visited, indexes, args, progress_report, retry=1):

#Prepares tasks for download_pdf function and stores association of "paper_name.pdf" with original metadata.

retry -= 1
retries = {} #Dictionary holding files for download retrial
tasks = [] #List to hold the tasks to be executed
retries = {} # Dictionary holding files for download retrial
tasks = [] # List to hold the tasks to be executed
ordered_metadata = list(metadata_dict.items())
user_agent_gen = user_agent_generator()
i = 0
reached_end_of_file = True #flag: if all metadata are in "visited"
reached_end_of_file = True # flag: if all metadata are in "visited"

#Process metadata urls and schedule downloads
for metadata, url in ordered_metadata:
if i < args.batch and metadata not in visited:
reached_end_of_file = False
if indexes:
index = indexes[-1] + 1
else:
index = 1
index = indexes[-1] + 1 if indexes else 1
indexes.append(index)
task = asyncio.create_task(
download_pdf(index, metadata, url, semaphore, args, next(user_agent_gen))
Expand All @@ -63,29 +55,28 @@ async def download_pdfs(metadata_dict, semaphore, visited, indexes, args, progre
if retries and retry > 0:
logging.info(f"Retrying download for {len(retries)} files")
await download_pdfs(retries, semaphore, visited, indexes, args, progress_report, retry-1)
if i < args.batch: reached_end_of_file = True
if i < args.batch:
reached_end_of_file = True
return reached_end_of_file

#Function to extract base URL from a given full URL
# Function to extract base URL from a given full URL
async def get_base_url(url):
if not url.startswith("http"):
url = f"http://{url}"
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
return base_url

#Function for the initialization of session headers
# Function for the initialization of session headers
async def setup_session(session, url, headers):
""" Initialize the session with base headers. """
base_url = await get_base_url(url)
initial_url = f"{base_url}"
async with session.get(initial_url, headers=headers) as response:
await response.text()
return headers

#Function that arranges concurrent download of a PDFs given pdf_url, then returns download status, metadata and filename as a tuple.
# Function that arranges concurrent download of PDFs
async def download_pdf(index, metadata, pdf_url, semaphore, args, user_agent, referer=None):

if not referer:
base_url = await get_base_url(pdf_url)
else:
Expand All @@ -100,13 +91,12 @@ async def download_pdf(index, metadata, pdf_url, semaphore, args, user_agent, re
async with semaphore:
timeout = aiohttp.ClientTimeout(total=60)
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False), timeout=timeout) as session:
# Randomized sleep time between args.sleep and args.sleep + 2 (better for passing bot detection)
await asyncio.sleep(random.uniform(sleep_time, sleep_time + 2))

file_name = f'paper_{index}.{file_type}' # Names file by order of appearance
file_name = f'paper_{index}.{file_type}'
try:
await setup_session(session, pdf_url, headers)
requester = getattr(session, request_type) # sets session type as either session.get or session.post
requester = getattr(session, request_type)
async with requester(pdf_url, headers=headers, allow_redirects=False) as response:
if response.status in (301, 302):
logging.error(f"Redirected: {pdf_url} to {response.headers['Location']}. Status code: {response.status}")
Expand All @@ -129,16 +119,14 @@ async def download_pdf(index, metadata, pdf_url, semaphore, args, user_agent, re
logging.error(f"Unexpected error while downloading {pdf_url}: {e}")
return (False, metadata, file_name)

#Function that writes downloaded content to a file
# Function that writes downloaded content to a file
async def write_file(filename, content, output_path = "./"):
path_to_file = os.path.join(output_path, filename)
async with aiofiles.open(path_to_file, 'wb') as file:
await file.write(content)

#Function to generate random user-agents for avoiding bot detection
#to add proxy rotation option
# Function to generate random user-agents
def user_agent_generator():

templates = [
"Mozilla/5.0 ({os}) AppleWebKit/537.36 (KHTML, like Gecko) {browser}/{version} Safari/537.36",
"Mozilla/5.0 ({os}) Gecko/20100101 {browser}/{version}",
Expand All @@ -164,14 +152,14 @@ def user_agent_generator():
user_agent = template.format(os=os, browser=browser, version=full_version)
yield user_agent

#Function for overall program executon
# Function for overall program execution
async def run(args):
current_working_directory = os.getcwd()
path_to_url_siteguide = os.path.join(current_working_directory, args.filename)
with open(path_to_url_siteguide, 'r') as file:
metadata_dict = json.load(file)

semaphore = asyncio.Semaphore(3) #if you get flagged by bot detection try adjusting value
semaphore = asyncio.Semaphore(args.concurrency) # Use concurrency argument for semaphore limit
try:
try:
with open('progress_report.json', 'r') as file:
Expand All @@ -183,7 +171,6 @@ async def run(args):
indexes = []
logging.info("No existing progress report found")
visited = list(progress_report.values())
# Download PDFs and update progress report
logging.info(f"Starting download from {args.filename}")
finished = await download_pdfs(metadata_dict, semaphore, visited, indexes, args, progress_report)
logging.info(f"Finished download from {args.filename}")
Expand All @@ -194,7 +181,6 @@ async def run(args):
finally:
if finished:
logging.info("All available have been downloaded - Finished!")
# still write to progress_report.json in case it finished because of i < args.batch
with open('progress_report.json', 'w') as file:
json.dump(progress_report, file, ensure_ascii=False, indent=4)
return True
Expand All @@ -205,60 +191,19 @@ async def run(args):
logging.info("Progress report written to progress_report.json")
return False

#Function for handling command-line arguments
# Function for handling command-line arguments
def parse_input():
parser = argparse.ArgumentParser(description="Gets PDFs through URLs given as value entries in a JSON.", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--json", help="Add path to JSON file with URLs siteguide", required=True)
parser.add_argument("--sleep", type=int, default=1, help="Set delay before new request is made (in seconds)")
parser.add_argument("--type", help="Select file type to be downloaded e.g., 'pdf', 'doc'", required=True)
parser.add_argument("--req", choices=['get', 'post'], default='get', help="Set request type 'get' or 'post'")
parser.add_argument("-o", "--output", default="./", help="Set download directory")
parser.add_argument("--little_potato", help="Set directory for progress_report.json (previously little_potato), default value is set to --output")
parser.add_argument("--batch", type=int, default=10, help="Set number of files to download per run")
args = parser.parse_args()
parser.add_argument("--little_potato", help="Set directory for progress_report.json (previously little_potato_downloaded)")
parser.add_argument("--concurrency", type=int, default=3, help="Set the concurrency limit (number of concurrent downloads)")
parser.add_argument("--batch", type=int, default=5, help="Set number of files to download per run")
return parser.parse_args()

if not args.little_potato:
args.little_potato = args.output
logging.info(f"Arguments received: JSON file: {args.json}, Sleep time: {args.sleep}, File type: {args.type}, Request type: {args.req}, Output path: {args.output}, 'progress_report.json' path: {args.little_potato}")
return args

#The main function to parse input arguments, load URL metadata from a JSON file, manage download progress with semaphores for concurrency, and save the download progress to a JSON report file
async def main():
args = parse_input()
with open(args.json, 'r') as file:
metadata_dict = json.load(file)
#Semaphore that limits concurrent downloads
semaphore = asyncio.Semaphore(3) # Adjust the value as needed

try:
#Read existing progress report if any
try:
progress_report_path = os.path.join(args.little_potato, 'progress_report.json')
with open(progress_report_path, 'r') as file:
progress_report = json.load(file)
logging.info("Existing progress report found and loaded")
indexes = get_indexes(list(progress_report.keys()))
except FileNotFoundError:
progress_report = {}
indexes = []
logging.info("No existing progress report found")
visited = list(progress_report.values())
logging.info("Starting PDF downloads")
finished = await download_pdfs(metadata_dict, semaphore, visited, indexes, args, progress_report)
if finished:
logging.info("All available files are in progress_report.json - Finished!")
else:
logging.info("PDF downloads completed")
except Exception as e:
logging.error(f"An error occurred: {e}")
raise
finally:
#Write progress report to a JSON file
progress_report_path = os.path.join(args.little_potato, 'progress_report.json')
with open(progress_report_path, 'w') as file:
json.dump(progress_report, file, ensure_ascii=False, indent=4)
logging.info("Progress report written to progress_report.json")

#Entry point of Downloader
if __name__ == "__main__":
asyncio.run(main())
args = parse_input()
asyncio.run(run(args))