diff --git a/.github/workflows/bitcoinops.yml b/.github/workflows/bitcoinops.yml index cefeac5..61f91f0 100644 --- a/.github/workflows/bitcoinops.yml +++ b/.github/workflows/bitcoinops.yml @@ -25,6 +25,21 @@ jobs: run: | mkdir /tmp/data python bitcoinops/main.py + + - name: Configure Git + run: | + git config user.email "${{ secrets.GIT_AUTHOR_EMAIL }}" + git config user.name "${{ secrets.GIT_AUTHOR_NAME }}" + + - name: Add and commit changes + run: | + git add . + if git diff --staged --quiet; then + echo "No changes to commit" + else + git commit -m "bitcoinops: updated scraper logs" + git push + fi env: ES_ENGINE: ${{ secrets.ES_ENGINE }} ES_URL: ${{ secrets.ES_URL }} diff --git a/.github/workflows/bitcointalk.yml b/.github/workflows/bitcointalk.yml index 28e4c3a..8179274 100644 --- a/.github/workflows/bitcointalk.yml +++ b/.github/workflows/bitcointalk.yml @@ -21,6 +21,21 @@ jobs: run: | mkdir /tmp/data python bitcointalk/main.py + + - name: Configure Git + run: | + git config user.email "${{ secrets.GIT_AUTHOR_EMAIL }}" + git config user.name "${{ secrets.GIT_AUTHOR_NAME }}" + + - name: Add and commit changes + run: | + git add . + if git diff --staged --quiet; then + echo "No changes to commit" + else + git commit -m "bitcointalk: updated scraper logs" + git push + fi env: ES_ENGINE: ${{ secrets.ES_ENGINE }} ES_URL: ${{ secrets.ES_URL }} diff --git a/.github/workflows/bitcointranscripts.yml b/.github/workflows/bitcointranscripts.yml index 42e5c00..708f701 100644 --- a/.github/workflows/bitcointranscripts.yml +++ b/.github/workflows/bitcointranscripts.yml @@ -21,6 +21,21 @@ jobs: run: | mkdir /tmp/data python bitcointranscripts/main.py + + - name: Configure Git + run: | + git config user.email "${{ secrets.GIT_AUTHOR_EMAIL }}" + git config user.name "${{ secrets.GIT_AUTHOR_NAME }}" + + - name: Add and commit changes + run: | + git add . + if git diff --staged --quiet; then + echo "No changes to commit" + else + git commit -m "bitcointranscripts: updated scraper logs" + git push + fi env: ES_ENGINE: ${{ secrets.ES_ENGINE }} ES_URL: ${{ secrets.ES_URL }} diff --git a/.github/workflows/delving-bitcoin.yml b/.github/workflows/delving-bitcoin.yml index fed1a0c..2f03528 100644 --- a/.github/workflows/delving-bitcoin.yml +++ b/.github/workflows/delving-bitcoin.yml @@ -18,6 +18,21 @@ jobs: - name: Fetch data run: | python delvingbitcoin_2_elasticsearch/delvingbitcoin_2_elasticsearch.py + + - name: Configure Git + run: | + git config user.email "${{ secrets.GIT_AUTHOR_EMAIL }}" + git config user.name "${{ secrets.GIT_AUTHOR_NAME }}" + + - name: Add and commit changes + run: | + git add . + if git diff --staged --quiet; then + echo "No changes to commit" + else + git commit -m "delving-bitcoin: updated scraper logs" + git push + fi env: ES_ENGINE: ${{ secrets.ES_ENGINE }} ES_URL: ${{ secrets.ES_URL }} diff --git a/.github/workflows/mailing-list-bitcoin-new.yml b/.github/workflows/mailing-list-bitcoin-new.yml index f46f4ed..90a7091 100644 --- a/.github/workflows/mailing-list-bitcoin-new.yml +++ b/.github/workflows/mailing-list-bitcoin-new.yml @@ -18,6 +18,21 @@ jobs: - name: Fetch data run: | python mailing-list/main.py + + - name: Configure Git + run: | + git config user.email "${{ secrets.GIT_AUTHOR_EMAIL }}" + git config user.name "${{ secrets.GIT_AUTHOR_NAME }}" + + - name: Add and commit changes + run: | + git add . + if git diff --staged --quiet; then + echo "No changes to commit" + else + git commit -m "mailing-list-bitcoin-new: updated scraper logs" + git push + fi env: ES_ENGINE: ${{ secrets.ES_ENGINE }} ES_URL: ${{ secrets.ES_URL }} diff --git a/.github/workflows/stackexchange.yml b/.github/workflows/stackexchange.yml index 3f1d6e2..bddbcd9 100644 --- a/.github/workflows/stackexchange.yml +++ b/.github/workflows/stackexchange.yml @@ -6,25 +6,44 @@ on: jobs: fetch: runs-on: ubuntu-latest + + env: + DATA_DIR: /tmp/stackexchange + ES_ENGINE: ${{ secrets.ES_ENGINE }} + ES_URL: ${{ secrets.ES_URL }} + ES_TOKEN: ${{ secrets.ES_TOKEN }} + CLOUD_ID: ${{ secrets.CLOUD_ID }} + USER_PASSWORD: ${{ secrets.USER_PASSWORD }} + USERNAME: ${{ secrets.USERNAME }} + INDEX: ${{ secrets.INDEX }} + steps: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 with: python-version: 3.9 + - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt + - name: Fetch data run: | mkdir /tmp/stackexchange python bitcoin.stackexchange.com/main.py - env: - DATA_DIR: /tmp/stackexchange - ES_ENGINE: ${{ secrets.ES_ENGINE }} - ES_URL: ${{ secrets.ES_URL }} - ES_TOKEN: ${{ secrets.ES_TOKEN }} - CLOUD_ID: ${{ secrets.CLOUD_ID }} - USER_PASSWORD: ${{ secrets.USER_PASSWORD }} - USERNAME: ${{ secrets.USERNAME }} - INDEX: ${{ secrets.INDEX }} + + - name: Configure Git + run: | + git config user.email "${{ secrets.GIT_AUTHOR_EMAIL }}" + git config user.name "${{ secrets.GIT_AUTHOR_NAME }}" + + - name: Add and commit changes + run: | + git add . + if git diff --staged --quiet; then + echo "No changes to commit" + else + git commit -m "stackexchange: updated scraper logs" + git push + fi diff --git a/bitcoin.stackexchange.com/main.py b/bitcoin.stackexchange.com/main.py index a16b698..0660048 100644 --- a/bitcoin.stackexchange.com/main.py +++ b/bitcoin.stackexchange.com/main.py @@ -1,116 +1,137 @@ import os -import time +import sys +import traceback from datetime import datetime + from dotenv import load_dotenv from loguru import logger from tqdm import tqdm -from utils import download_dump, extract_dump, parse_posts, parse_users, strip_tags, document_view, document_add -import traceback -load_dotenv() - -if __name__ == "__main__": +from utils import download_dump, extract_dump, parse_posts, parse_users, strip_tags - INDEX = os.getenv("INDEX") +load_dotenv() - BASE_DIR = os.getenv("DATA_DIR", ".") - DOWNLOAD_PATH = os.path.join(BASE_DIR, "bitcoin.stackexchange.com.7z") - EXTRACT_PATH = os.path.join(BASE_DIR, "bitcoin.stackexchange.com") +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from common.scraper_log_utils import scraper_log_csv +from common.elasticsearch_utils import upsert_document - # download archive data - if not os.path.exists(DOWNLOAD_PATH): - download_dump(DOWNLOAD_PATH) - else: - logger.info(f'File already exists at path: {os.path.abspath(DOWNLOAD_PATH)}') +if __name__ == "__main__": + inserted_ids = set() + updated_ids = set() + no_changes_ids = set() + error_occurred = False + error_message = "---" + + try: + INDEX = os.getenv("INDEX") + + BASE_DIR = os.getenv("DATA_DIR", ".") + DOWNLOAD_PATH = os.path.join(BASE_DIR, "bitcoin.stackexchange.com.7z") + EXTRACT_PATH = os.path.join(BASE_DIR, "bitcoin.stackexchange.com") + + # download archive data + if not os.path.exists(DOWNLOAD_PATH): + download_dump(DOWNLOAD_PATH) + else: + logger.info(f'File already exists at path: {os.path.abspath(DOWNLOAD_PATH)}') - # extract the data if necessary - if not os.path.exists(EXTRACT_PATH): - os.makedirs(EXTRACT_PATH) - should_extract = True - else: - if not os.listdir(EXTRACT_PATH): + # extract the data if necessary + if not os.path.exists(EXTRACT_PATH): + os.makedirs(EXTRACT_PATH) should_extract = True else: - file_count = len(os.listdir(EXTRACT_PATH)) - logger.info(f'{file_count} files already exist at path: {os.path.abspath(EXTRACT_PATH)}') - should_extract = False - - if should_extract: - extract_dump(DOWNLOAD_PATH, EXTRACT_PATH) - - # parse the data - USERS_FILE_PATH = f"{EXTRACT_PATH}/Users.xml" - users = parse_users(USERS_FILE_PATH) - - POSTS_FILE_PATH = f"{EXTRACT_PATH}/Posts.xml" - docs = parse_posts(POSTS_FILE_PATH) - - for post in tqdm(docs): - try: - if post.attrib.get("PostTypeId") != "1" and post.attrib.get("PostTypeId") != "2": - continue - - user = users.get(post.attrib.get("OwnerUserId")) or post.attrib.get("OwnerDisplayName") or "Anonymous" - - # prepare the document based on type: 'question' or 'answer' - if post.attrib.get("ParentId") is None: - tags = post.attrib.get("Tags") - tags = tags[1:-1] - tags = tags.split("><") - document = { - "title": post.attrib.get("Title"), - "body": strip_tags(post.attrib.get("Body")), - "body_type": "raw", - "authors": [user], - "id": "stackexchange-" + post.attrib.get("Id"), - "tags": tags, - "domain": "https://bitcoin.stackexchange.com", - "url": "https://bitcoin.stackexchange.com/questions/" + post.attrib.get("Id"), - "thread_url": "https://bitcoin.stackexchange.com/questions/" + post.attrib.get("Id"), - "created_at": post.attrib.get("CreationDate"), - "accepted_answer_id": post.attrib.get("AcceptedAnswerId"), - "type": "question", - "indexed_at": datetime.utcnow().isoformat() - } + if not os.listdir(EXTRACT_PATH): + should_extract = True else: - posts = {} - question = posts.get(post.attrib.get("ParentId")) - if question is None: - question = docs.find("./row[@Id='" + post.attrib.get("ParentId") + "']") - posts[post.attrib.get("ParentId")] = question - - document = { - "title": question.attrib.get("Title") + " (Answer)", - "body": strip_tags(post.attrib.get("Body")), - "body_type": "raw", - "authors": [user], - "id": "stackexchange-" + post.attrib.get("Id"), - "domain": "https://bitcoin.stackexchange.com", - "url": "https://bitcoin.stackexchange.com/questions/" + post.attrib.get( - "ParentId") + "#" + post.attrib.get("Id"), - "thread_url": "https://bitcoin.stackexchange.com/questions/" + post.attrib.get( - "ParentId") + "#" + post.attrib.get("Id"), - "created_at": post.attrib.get("CreationDate"), - "type": "answer", - "indexed_at": datetime.utcnow().isoformat() - } - - # # delete posts with previous logic where '_id' was set on its own and replace them with our logic - # this_id = document['id'] - # logger.info(f"this_id: {this_id}") - # _ = find_and_delete_document_by_source_id(INDEX, this_id) - - # insert the doc if it doesn't exist, with '_id' set by our logic - resp = document_view(index_name=INDEX, doc_id=document['id']) - if not resp: - _ = document_add(index_name=INDEX, doc=document, doc_id=document['id']) - logger.success(f"Added! ID: {document['id']}, Title: {document['title']}") - else: - # logger.info(f"Exist! ID: {document['id']}, Title: {document['title']}") - pass - - except Exception as ex: - logger.error(f"Error occurred: {ex} \n{traceback.format_exc()}") - time.sleep(5) - - logger.info(f"All Documents updated successfully!") + file_count = len(os.listdir(EXTRACT_PATH)) + logger.info(f'{file_count} files already exist at path: {os.path.abspath(EXTRACT_PATH)}') + should_extract = False + + if should_extract: + extract_dump(DOWNLOAD_PATH, EXTRACT_PATH) + + # parse the data + USERS_FILE_PATH = f"{EXTRACT_PATH}/Users.xml" + users = parse_users(USERS_FILE_PATH) + + POSTS_FILE_PATH = f"{EXTRACT_PATH}/Posts.xml" + docs = parse_posts(POSTS_FILE_PATH) + + for post in tqdm(docs): + try: + if post.attrib.get("PostTypeId") != "1" and post.attrib.get("PostTypeId") != "2": + continue + + user = users.get(post.attrib.get("OwnerUserId")) or post.attrib.get("OwnerDisplayName") or "Anonymous" + + # prepare the document based on type: 'question' or 'answer' + if post.attrib.get("ParentId") is None: + tags = post.attrib.get("Tags") + tags = tags[1:-1] + tags = tags.split("><") + document = { + "title": post.attrib.get("Title"), + "body": strip_tags(post.attrib.get("Body")), + "body_type": "raw", + "authors": [user], + "id": "stackexchange-" + post.attrib.get("Id"), + "tags": tags, + "domain": "https://bitcoin.stackexchange.com", + "url": "https://bitcoin.stackexchange.com/questions/" + post.attrib.get("Id"), + "thread_url": "https://bitcoin.stackexchange.com/questions/" + post.attrib.get("Id"), + "created_at": post.attrib.get("CreationDate"), + "accepted_answer_id": post.attrib.get("AcceptedAnswerId"), + "type": "question", + "indexed_at": datetime.utcnow().isoformat() + } + else: + posts = {} + question = posts.get(post.attrib.get("ParentId")) + if question is None: + question = docs.find("./row[@Id='" + post.attrib.get("ParentId") + "']") + posts[post.attrib.get("ParentId")] = question + + document = { + "title": question.attrib.get("Title") + " (Answer)", + "body": strip_tags(post.attrib.get("Body")), + "body_type": "raw", + "authors": [user], + "id": "stackexchange-" + post.attrib.get("Id"), + "domain": "https://bitcoin.stackexchange.com", + "url": "https://bitcoin.stackexchange.com/questions/" + post.attrib.get( + "ParentId") + "#" + post.attrib.get("Id"), + "thread_url": "https://bitcoin.stackexchange.com/questions/" + post.attrib.get( + "ParentId") + "#" + post.attrib.get("Id"), + "created_at": post.attrib.get("CreationDate"), + "type": "answer", + "indexed_at": datetime.utcnow().isoformat() + } + + try: + res = upsert_document(index_name=os.getenv('INDEX'), doc_id=document['id'], doc_body=document) + if res['result'] == 'created': + inserted_ids.add(res['_id']) + elif res['result'] == 'updated': + updated_ids.add(res['_id']) + elif res['result'] == 'noop': + no_changes_ids.add(res['_id']) + except Exception as e: + # error_occurred = True + logger.error(f"Error upserting document ID-{document['id']}: {e}") + logger.warning(document) + + except Exception as ex: + # error_occurred = True + error_log = f"{ex}\n{traceback.format_exc()}" + logger.error(error_log) + logger.warning(post) + + logger.info(f"All Documents updated successfully!") + + except Exception as main_e: + error_message = f"error: {main_e}\n{traceback.format_exc()}" + + finally: + scraper_log_csv(f"bitcoin_stackexchange.csv", scraper_domain="https://bitcoin.stackexchange.com", + inserted_docs=len(inserted_ids), + updated_docs=len(updated_ids), no_changes_docs=len(no_changes_ids), error=error_message) diff --git a/bitcoinops/main.py b/bitcoinops/main.py index 2557059..dd54b20 100644 --- a/bitcoinops/main.py +++ b/bitcoinops/main.py @@ -1,9 +1,11 @@ import asyncio import os +import re import sys +import traceback import zipfile from datetime import datetime - +from tqdm import tqdm import requests import yaml from dotenv import load_dotenv @@ -12,6 +14,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from common.elasticsearch_utils import upsert_document +from common.scraper_log_utils import scraper_log_csv from common.utils import parse_markdown load_dotenv() @@ -103,25 +106,34 @@ def dir_walk(extracted_dir: str, typeof: str): async def main(): - await download_repo() - all_posts = dir_walk(os.path.join(DIR_PATH, POST_DIR), "posts") - all_topics = dir_walk(os.path.join(DIR_PATH, TOPIC_DIR), "topic") - new_ids = set() + inserted_ids = set() updated_ids = set() - all_posts.extend(all_topics) - for post in all_posts: - try: - res = upsert_document(index_name=INDEX_NAME, doc_id=post['id'], doc_body=post) - logger.info(f"Version-{res['_version']}, Result-{res['result']}, ID-{res['_id']}") - if res['result'] == 'created': - new_ids.add(res['_id']) - if res['result'] == 'updated': - updated_ids.add(res['_id']) - except Exception as e: - logger.error(f"Error: {e}") - logger.warning(post) - logger.info(f"Inserted {len(new_ids)} new documents") - logger.info(f"Updated {len(updated_ids)} documents") + no_changes_ids = set() + error_message = None + + try: + await download_repo() + all_posts = dir_walk(os.path.join(DIR_PATH, POST_DIR), "posts") + all_topics = dir_walk(os.path.join(DIR_PATH, TOPIC_DIR), "topic") + all_posts.extend(all_topics) + for post in tqdm(all_posts): + try: + res = upsert_document(index_name=INDEX_NAME, doc_id=post['id'], doc_body=post) + if res['result'] == 'created': + inserted_ids.add(res['_id']) + elif res['result'] == 'updated': + updated_ids.add(res['_id']) + elif res['result'] == 'noop': + no_changes_ids.add(res['_id']) + except Exception as e: + logger.error(f"Error: {e}") + logger.warning(post) + + except Exception as main_e: + error_message = f"error: {main_e}\n{traceback.format_exc()}" + finally: + scraper_log_csv(f"bitcoinops.csv", scraper_domain="https://bitcoinops.org/en/", inserted_docs=len(inserted_ids), + updated_docs=len(updated_ids), no_changes_docs=len(no_changes_ids), error=error_message) if __name__ == '__main__': diff --git a/bitcointalk/main.py b/bitcointalk/main.py index 6569d14..4a4ca9b 100644 --- a/bitcointalk/main.py +++ b/bitcointalk/main.py @@ -13,6 +13,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from common.elasticsearch_utils import upsert_document +from common.scraper_log_utils import scraper_log_csv load_dotenv() @@ -146,39 +147,51 @@ def fetch_posts(url: str): def main() -> None: - filename = os.path.join(os.getenv('DATA_DIR'), 'bitcointalk', 'topics.json') - topics = [] - - if not os.path.exists(filename): - topics = fetch_all_topics() - with open(filename, 'w') as f: - json.dump(topics, f) - else: - with open(filename, 'r') as f: - topics = json.load(f) - - logger.info(f"Found {len(topics)} topics") - new_ids = set() + inserted_ids = set() updated_ids = set() - start_index = int(os.getenv('START_INDEX', 0)) - for i in range(start_index, len(topics)): - topic = topics[i] - logger.info(f"Processing {i + 1}/{len(topics)}") - documents = fetch_posts(topic) - for document in documents: + no_changes_ids = set() + error_message = None + + try: + filename = os.path.join(os.getenv('DATA_DIR'), 'bitcointalk', 'topics.json') + topics = [] + + if not os.path.exists(filename): + topics = fetch_all_topics() + with open(filename, 'w') as f: + json.dump(topics, f) + else: + with open(filename, 'r') as f: + topics = json.load(f) + + logger.info(f"Found {len(topics)} topics") + + start_index = int(os.getenv('START_INDEX', 0)) + for i in range(start_index, len(topics)): + topic = topics[i] try: - res = upsert_document(index_name=os.getenv('INDEX'), doc_id=document['id'], doc_body=document) - logger.info(f"Version-{res['_version']}, Result-{res['result']}, ID-{res['_id']}") - if res['result'] == 'created': - new_ids.add(res['_id']) - if res['result'] == 'updated': - updated_ids.add(res['_id']) - except Exception as ex: - logger.error(f"{ex} \n{traceback.format_exc()}") - logger.warning(document) - - logger.info(f"Inserted {len(new_ids)} new documents") - logger.info(f"Updated {len(updated_ids)} documents") + logger.info(f"Processing {i + 1}/{len(topics)}") + documents = fetch_posts(topic) + for document in documents: + try: + res = upsert_document(index_name=os.getenv('INDEX'), doc_id=document['id'], doc_body=document) + if res['result'] == 'created': + inserted_ids.add(res['_id']) + elif res['result'] == 'updated': + updated_ids.add(res['_id']) + elif res['result'] == 'noop': + no_changes_ids.add(res['_id']) + + except Exception as e: + logger.error(f"Error upserting document ID-{document['id']}: {e}") + logger.warning(document) + except Exception as e: + logger.error(f"Error processing topic {i + 1}/{len(topics)}: {e}") + except Exception as main_e: + error_message = f"error: {main_e}\n{traceback.format_exc()}" + finally: + scraper_log_csv(f"bitcoin_talk.csv", scraper_domain=BOARD_URL, inserted_docs=len(inserted_ids), + updated_docs=len(updated_ids), no_changes_docs=len(no_changes_ids), error=error_message) if __name__ == "__main__": diff --git a/bitcointranscripts/main.py b/bitcointranscripts/main.py index 52d0b41..9826e76 100644 --- a/bitcointranscripts/main.py +++ b/bitcointranscripts/main.py @@ -5,6 +5,7 @@ import traceback import zipfile from datetime import datetime +from tqdm import tqdm import requests import yaml @@ -12,13 +13,12 @@ from loguru import logger sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - +from common.scraper_log_utils import scraper_log_csv from common.elasticsearch_utils import upsert_document from common.utils import parse_markdown load_dotenv() - FOLDER_NAME = "bitcointranscripts-master" REPO_URL = "https://github.com/bitcointranscripts/bitcointranscripts/archive/refs/heads/master.zip" @@ -60,13 +60,14 @@ def download_repo(): def parse_posts(directory): + logger.info(f"parsing posts from: {directory}") documents = [] root_depth = directory.rstrip(os.sep).count(os.sep) for root, dirs, files in os.walk(directory): current_depth = root.rstrip(os.sep).count(os.sep) if current_depth <= root_depth: continue - for file in files: + for file in tqdm(files): translated_transcript_pattern = r'\.([a-z][a-z])\.md$' transcript = file.endswith('.md') translated_transcript = re.search(translated_transcript_pattern, file) @@ -94,7 +95,8 @@ def parse_post(file_path): 'body_formatted': body, 'body': body, 'body_type': "markdown", - 'created_at': (datetime.strptime(metadata['date'], '%Y-%m-%d').strftime('%Y-%m-%dT%H:%M:%S.000Z') if isinstance(metadata.get('date'), str) else None), + 'created_at': (datetime.strptime(metadata['date'], '%Y-%m-%d').strftime('%Y-%m-%dT%H:%M:%S.000Z') if isinstance( + metadata.get('date'), str) else None), 'domain': "https://btctranscripts.com/", 'url': "https://btctranscripts.com" + url_path.replace(os.sep, "/"), 'categories': metadata.get('categories', []), @@ -110,29 +112,34 @@ def parse_post(file_path): async def main(): - download_repo() - documents = parse_posts(GLOBAL_URL_VARIABLE) - logger.info(f"Filtering existing {len(documents)} documents... please wait...") - - new_ids = set() + inserted_ids = set() updated_ids = set() - for document in documents: - try: - # Update the provided fields with those in the existing document, - # ensuring that any fields not specified in 'doc_body' remain unchanged in the ES document - res = upsert_document(index_name=INDEX_NAME, doc_id=document['id'], doc_body=document) - if res['result'] == 'created': - logger.success(f"Version: {res['_version']}, Result: {res['result']}, ID: {res['_id']}") - new_ids.add(res['_id']) - if res['result'] == 'updated': - updated_ids.add(res['_id']) - logger.info(f"Version: {res['_version']}, Result: {res['result']}, ID: {res['_id']}") - except Exception as ex: - logger.error(f"{ex} \n{traceback.format_exc()}") - logger.warning(document) - - logger.info(f"Inserted {len(new_ids)} new documents") - logger.info(f"Updated {len(updated_ids)} documents") + no_changes_ids = set() + error_message = None + + try: + download_repo() + documents = parse_posts(GLOBAL_URL_VARIABLE) + logger.info(f"Filtering existing {len(documents)} documents... please wait...") + + for document in tqdm(documents): + try: + res = upsert_document(index_name=INDEX_NAME, doc_id=document['id'], doc_body=document) + if res['result'] == 'created': + inserted_ids.add(res['_id']) + elif res['result'] == 'updated': + updated_ids.add(res['_id']) + elif res['result'] == 'noop': + no_changes_ids.add(res['_id']) + except Exception as ex: + logger.error(f"{ex} \n{traceback.format_exc()}") + logger.warning(document) + except Exception as main_e: + error_message = f"error: {main_e}\n{traceback.format_exc()}" + finally: + scraper_log_csv(f"btctranscripts.csv", scraper_domain="https://btctranscripts.com/", + inserted_docs=len(inserted_ids), + updated_docs=len(updated_ids), no_changes_docs=len(no_changes_ids), error=error_message) if __name__ == '__main__': diff --git a/common/elasticsearch_utils.py b/common/elasticsearch_utils.py index d0d01e4..8d18338 100644 --- a/common/elasticsearch_utils.py +++ b/common/elasticsearch_utils.py @@ -159,3 +159,21 @@ def update_authors_names_from_es(index, old_author, new_author, max_retries=3, r else: logger.warning('Could not connect to Elasticsearch') return None + + +def get_domain_counts(index_name, domain): + """Function to get the total counts for the given 'domain' field from Elasticsearch index.""" + body = { + "query": { + "term": { + "domain.keyword": domain + } + } + } + + try: + resp = es.count(index=index_name, body=body) + return resp['count'] + except Exception as e: + logger.error(f"Error fetching domain counts: {e}") + return None diff --git a/common/scraper_log_utils.py b/common/scraper_log_utils.py new file mode 100644 index 0000000..a188893 --- /dev/null +++ b/common/scraper_log_utils.py @@ -0,0 +1,31 @@ +import csv +import os +import sys +from datetime import datetime + +from loguru import logger as log + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from common.elasticsearch_utils import get_domain_counts + + +def scraper_log_csv(csv_name, scraper_domain, inserted_docs, updated_docs, no_changes_docs, error=None): + last_scraped = datetime.now().isoformat(timespec='milliseconds').replace('+00:00', 'Z') + total_docs = get_domain_counts(index_name=os.getenv('INDEX'), domain=scraper_domain) + row = [last_scraped, scraper_domain, total_docs, inserted_docs, updated_docs, no_changes_docs, error] + + dir_path = "./scraper_logs/" + + os.makedirs(dir_path, exist_ok=True) + with open(f"{dir_path}/{csv_name}", mode='a', newline='') as csv_file: + writer = csv.writer(csv_file) + if csv_file.tell() == 0: + writer.writerow( + ['last_scraped', 'source', 'total_docs', 'inserted_docs', 'updated_docs', 'no_changed_docs', 'error']) + writer.writerow(row) + log.success("CSV Update Successfully") + + log.info(f"Inserted Docs: {inserted_docs}") + log.info(f"Updated Docs: {updated_docs}") + log.info(f"No changed Docs: {no_changes_docs}") + log.info(f"Error Message: {error}") diff --git a/common/utils.py b/common/utils.py index 21bf140..553c4e7 100644 --- a/common/utils.py +++ b/common/utils.py @@ -1,7 +1,8 @@ import re + def parse_markdown(text): - """Parses a markdown text to extract YAML front matter and the document body""" + """Parses a Markdown text to extract YAML front matter and the document body""" # Remove content between {% %} text = re.sub(r'{%.*%}', '', text, flags=re.MULTILINE) # Define a regular expression pattern to match the front matter between `---\n` delimiters @@ -9,10 +10,10 @@ def parse_markdown(text): match = pattern.search(text) if not match: raise ValueError("Input text does not contain proper front matter delimiters '---'") - + # Extract the front matter and the body front_matter = match.group(1).strip() body_start = match.end() body = text[body_start:].strip() - + return front_matter, body diff --git a/delvingbitcoin_2_elasticsearch/delvingbitcoin_2_elasticsearch.py b/delvingbitcoin_2_elasticsearch/delvingbitcoin_2_elasticsearch.py index c5afe37..1df4462 100644 --- a/delvingbitcoin_2_elasticsearch/delvingbitcoin_2_elasticsearch.py +++ b/delvingbitcoin_2_elasticsearch/delvingbitcoin_2_elasticsearch.py @@ -2,7 +2,9 @@ import json import os import sys +import traceback from datetime import datetime +from tqdm import tqdm from bs4 import BeautifulSoup from dotenv import load_dotenv @@ -10,8 +12,8 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from common.elasticsearch_utils import document_add, document_view, create_index - +from common.elasticsearch_utils import create_index, upsert_document +from common.scraper_log_utils import scraper_log_csv from achieve import download_dumps dotenv_path = os.path.join(os.path.dirname(__file__), '..', '.env') @@ -61,50 +63,77 @@ def strip_attributes_but_urls(html): def index_documents(files_path): - # Iterate through files in the specified path - for root, dirs, files in os.walk(files_path): - for file in files: - if file.endswith('.json'): - file_path = os.path.join(root, file) - log.info(f'Fetching document from file: {file_path}') - - # Load JSON data from file - with open(file_path, 'r', encoding='utf-8') as json_file: - document = json.load(json_file) - - body = preprocess_body(document['raw']) - if body == "(post deleted by author)": - log.info(f"Probably, post deleted by an author: {body}") - continue - - # Select required fields - doc = { - 'id': f'delving-bitcoin-{document["topic_id"]}-{document["post_number"]}-{document["id"]}', - 'authors': [document['username']], - 'thread_url': f"https://delvingbitcoin.org/t/{document['topic_slug']}/{document['topic_id']}", - 'title': document['topic_title'], - 'body_type': 'html', - 'body': body, - 'body_formatted': strip_attributes_but_urls(document['cooked']), - 'created_at': document['updated_at'], - 'domain': "https://delvingbitcoin.org/", - 'url': f"https://delvingbitcoin.org/t/{document['topic_slug']}/{document['topic_id']}", - "indexed_at": datetime.utcnow().isoformat() - } - - if document['post_number'] != 1: - doc['url'] += f'/{document["post_number"]}' - doc['type'] = 'reply' - else: - doc['type'] = 'original_post' - - # Check if a document already exists - resp = document_view(index_name=INDEX, doc_id=doc['id']) - if not resp: - _ = document_add(index_name=INDEX, doc=doc, doc_id=doc['id']) - log.success(f'Successfully added! ID: {doc["id"]}, Type:{doc["type"]}') - else: - log.info(f"Document already exist! ID: {doc['id']}") + inserted_ids = set() + updated_ids = set() + no_changes_ids = set() + error_message = None + + try: + # Iterate through files in the specified path + for root, dirs, files in os.walk(files_path): + try: + for file in tqdm(files): + if file.endswith('.json'): + file_path = os.path.join(root, file) + try: + log.info(f'Fetching document from file: {file_path}') + # Load JSON data from file + with open(file_path, 'r', encoding='utf-8') as json_file: + document = json.load(json_file) + + body = preprocess_body(document['raw']) + if body == "(post deleted by author)": + log.info(f"Probably, post deleted by an author: {body}") + continue + + # Select required fields + doc = { + 'id': f'delving-bitcoin-{document["topic_id"]}-{document["post_number"]}-{document["id"]}', + 'authors': [document['username']], + 'thread_url': f"https://delvingbitcoin.org/t/{document['topic_slug']}/{document['topic_id']}", + 'title': document['topic_title'], + 'body_type': 'html', + 'body': body, + 'body_formatted': strip_attributes_but_urls(document['cooked']), + 'created_at': document['updated_at'], + 'domain': "https://delvingbitcoin.org/", + 'url': f"https://delvingbitcoin.org/t/{document['topic_slug']}/{document['topic_id']}", + "indexed_at": datetime.utcnow().isoformat() + } + + if document['post_number'] != 1: + doc['url'] += f'/{document["post_number"]}' + doc['type'] = 'reply' + else: + doc['type'] = 'original_post' + try: + res = upsert_document(index_name=os.getenv('INDEX'), doc_id=doc['id'], + doc_body=doc) + if res['result'] == 'created': + inserted_ids.add(res['_id']) + elif res['result'] == 'updated': + updated_ids.add(res['_id']) + elif res['result'] == 'noop': + no_changes_ids.add(res['_id']) + except Exception as ex: + log.error(f"{ex} \n{traceback.format_exc()}") + log.warning(document) + + except Exception as ex: + error_log = f"{ex}\n{traceback.format_exc()}" + log.error(error_log) + log.warning(file_path) + except Exception as ex: + error_log = f"{ex}\n{traceback.format_exc()}" + log.error(error_log) + + except Exception as main_e: + error_message = f"error: {main_e}\n{traceback.format_exc()}" + + finally: + scraper_log_csv(f"delvingbitcoin.csv", scraper_domain="https://delvingbitcoin.org/", + inserted_docs=len(inserted_ids), + updated_docs=len(updated_ids), no_changes_docs=len(no_changes_ids), error=error_message) if __name__ == "__main__": diff --git a/mailing-list/main.py b/mailing-list/main.py index 31b019e..5c44be0 100644 --- a/mailing-list/main.py +++ b/mailing-list/main.py @@ -4,6 +4,7 @@ import traceback import urllib.request from datetime import datetime +from tqdm import tqdm import requests from bs4 import BeautifulSoup @@ -13,7 +14,8 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from common.elasticsearch_utils import document_view, document_add +from common.elasticsearch_utils import upsert_document +from common.scraper_log_utils import scraper_log_csv load_dotenv() @@ -40,14 +42,14 @@ def save_web_page(link, file_name): path = os.path.join(DOWNLOAD_PATH, file_name) with open(path, 'w', encoding='utf-8') as file: - logger.info(f'Downloading {file_name}') + # logger.info(f'Downloading {file_name}') file.write(str(soup)) def download_dumps(path, page_visited_count, max_page_count=2): if page_visited_count > max_page_count: return page_visited_count += 1 - logger.info(f"Page {page_visited_count}: {path}") + logger.info(f"Downloading data... | Page {page_visited_count}: {path}") with urllib.request.urlopen(f"{path}") as f: soup = BeautifulSoup(f, "html.parser") pre_tags = soup.find_all('pre') @@ -55,7 +57,7 @@ def download_dumps(path, page_visited_count, max_page_count=2): return next_page_link = f"{ORIGINAL_URL}{soup.find('a', {'rel': 'next'}).get('href')}" - for tag in pre_tags[1].find_all('a'): + for tag in tqdm(pre_tags[1].find_all('a')): try: date = tag.next_sibling.strip()[:7] date = date.strip().split('-') @@ -77,7 +79,6 @@ def download_dumps(path, page_visited_count, max_page_count=2): logger.error(e) logger.error(tag) continue - logger.info('----------------------------------------------------------\n') if next_page_link: download_dumps(next_page_link, page_visited_count) @@ -137,9 +138,10 @@ def preprocess_body_text(text): def parse_dumps(): doc = [] + logger.info("Parsing files...") for root, dirs, files in os.walk(DOWNLOAD_PATH): - for file in reversed(files): - logger.info(f'parsing : {file}') + for file in tqdm(reversed(files)): + # logger.info(f'parsing : {file}') with open(f'{os.path.join(root, file)}', 'r', encoding='utf-8') as f: u = file[9:].replace(".html", "") html_content = f.read() @@ -196,7 +198,8 @@ def parse_dumps(): "created_at": date, "domain": CUSTOM_URL, "thread_url": main_url, - "url": f"{main_url}{href}" + "url": f"{main_url}{href}", + 'indexed_at': datetime.now().isoformat() } if index == 0: @@ -211,14 +214,30 @@ def parse_dumps(): def index_documents(docs): - for doc in docs: - - resp = document_view(index_name=INDEX, doc_id=doc['id']) - if not resp: - _ = document_add(index_name=INDEX, doc=doc, doc_id=doc['id']) - logger.success(f'Successfully added! ID: {doc["id"]}') - else: - logger.info(f"Document already exist! ID: {doc['id']}") + inserted_ids = set() + updated_ids = set() + no_changes_ids = set() + error_message = None + try: + for doc in tqdm(docs): + try: + res = upsert_document(index_name=os.getenv('INDEX'), doc_id=doc['id'], doc_body=doc) + if res['result'] == 'created': + inserted_ids.add(res['_id']) + elif res['result'] == 'updated': + updated_ids.add(res['_id']) + elif res['result'] == 'noop': + no_changes_ids.add(res['_id']) + + except Exception as e: + logger.error(f"Error upserting document ID-{doc['id']}: {e}") + logger.warning(doc) + + except Exception as main_e: + error_message = f"error: {main_e}\n{traceback.format_exc()}" + finally: + scraper_log_csv(f"bitcoin_dev_new.csv", scraper_domain=CUSTOM_URL, inserted_docs=len(inserted_ids), + updated_docs=len(updated_ids), no_changes_docs=len(no_changes_ids), error=error_message) if __name__ == "__main__":