Skip to content

Commit

Permalink
WIP ETag/Last-Modified
Browse files Browse the repository at this point in the history
  • Loading branch information
zstyblik committed Jun 5, 2024
1 parent 540c98c commit 9f9f361
Show file tree
Hide file tree
Showing 7 changed files with 432 additions and 100 deletions.
11 changes: 7 additions & 4 deletions gh2slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def format_message(
try:
title = cache_item["title"].encode("utf-8")
except UnicodeEncodeError:
logger.error("Failed to encode title as UTF-8: %s", repr(title))
logger.error(
"Failed to encode title as UTF-8: %s",
repr(cache_item.get("title", None)),
)
logger.error(traceback.format_exc())
title = "Unknown title due to UTF-8 exception, {:s}#{:d}".format(
section, cache_item["number"]
Expand Down Expand Up @@ -157,7 +160,7 @@ def main():
sys.exit(0)

cache = rss2irc.read_cache(logger, args.cache)
scrub_cache(logger, cache)
scrub_items(logger, cache)

# Note: I have failed to find web link to repo in GH response.
# Therefore, let's create one.
Expand Down Expand Up @@ -220,7 +223,7 @@ def parse_args() -> argparse.Namespace:
"--cache-expiration",
dest="cache_expiration",
type=int,
default=rss2irc.EXPIRATION,
default=rss2irc.CACHE_EXPIRATION,
help="Time, in seconds, for how long to keep items " "in cache.",
)
parser.add_argument(
Expand Down Expand Up @@ -344,7 +347,7 @@ def process_page_items(
return to_publish


def scrub_cache(logger: logging.Logger, cache: rss2irc.CachedData) -> None:
def scrub_items(logger: logging.Logger, cache: rss2irc.CachedData) -> None:
"""Scrub cache and remove expired items."""
time_now = int(time.time())
for key in list(cache.items.keys()):
Expand Down
54 changes: 31 additions & 23 deletions phpbb2slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,20 @@ def main():
try:
slack_token = rss2slack.get_slack_token()
authors = get_authors_from_file(logger, args.authors_file)

data = rss2irc.get_rss(logger, args.rss_url, args.rss_http_timeout)
if not data:
# FIXME
rsp = rss2irc.get_rss(logger, args.rss_url, args.rss_http_timeout)
if not rsp.text:
logger.error("Failed to get RSS from %s", args.rss_url)
sys.exit(1)

news = parse_news(data, authors)
news = parse_news(rsp.text, authors)
if not news:
logger.info("No news?")
sys.exit(0)

cache = rss2irc.read_cache(logger, args.cache)
scrub_cache(logger, cache)

for key in list(news.keys()):
if key not in cache.items:
continue

logger.debug("Key %s found in cache", key)
comments_cached = int(cache.items[key]["comments_cnt"])
comments_actual = int(news[key]["comments_cnt"])
if comments_cached == comments_actual:
cache.items[key]["expiration"] = (
int(time.time()) + args.cache_expiration
)
news.pop(key)
scrub_items(logger, cache)
prune_news(logger, cache, news, args.cache_expiration)

slack_client = rss2slack.get_slack_web_client(
slack_token, args.slack_base_url, args.slack_timeout
Expand All @@ -126,8 +114,7 @@ def main():
finally:
time.sleep(args.sleep)

expiration = int(time.time()) + args.cache_expiration
update_cache(cache, news, expiration)
update_items_expiration(cache, news, args.cache_expiration)
rss2irc.write_cache(cache, args.cache)
except Exception:
logger.debug(traceback.format_exc())
Expand Down Expand Up @@ -271,7 +258,27 @@ def parse_news(data: str, authors: List[str]) -> Dict:
return news


def scrub_cache(logger: logging.Logger, cache: rss2irc.CachedData) -> None:
def prune_news(
logger: logging.Logger,
cache: rss2irc.CachedData,
news: Dict[str, Dict],
expiration: int = CACHE_EXPIRATION,
):
"""Prune news which already are in cache."""
item_expiration = int(time.time()) + expiration
for key in list(news.keys()):
if key not in cache.items:
continue

logger.debug("Key %s found in cache", key)
comments_cached = int(cache.items[key]["comments_cnt"])
comments_actual = int(news[key]["comments_cnt"])
if comments_cached == comments_actual:
cache.items[key]["expiration"] = item_expiration
news.pop(key)


def scrub_items(logger: logging.Logger, cache: rss2irc.CachedData) -> None:
"""Scrub cache and remove expired items."""
time_now = int(time.time())
for key in list(cache.items.keys()):
Expand All @@ -290,13 +297,14 @@ def scrub_cache(logger: logging.Logger, cache: rss2irc.CachedData) -> None:
cache.items.pop(key)


def update_cache(
def update_items_expiration(
cache: rss2irc.CachedData, news: Dict, expiration: int
) -> None:
"""Update cache contents."""
item_expiration = int(time.time()) + expiration
for key in list(news.keys()):
cache.items[key] = {
"expiration": expiration,
"expiration": item_expiration,
"comments_cnt": int(news[key]["comments_cnt"]),
}

Expand Down
177 changes: 137 additions & 40 deletions rss2irc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,73 @@
import feedparser
import requests

EXPIRATION = 86400 # seconds
CACHE_EXPIRATION = 86400 # seconds
DATA_SOURCE_EXPIRATION = 30 * 86400 # seconds
HTTP_TIMEOUT = 30 # seconds


@dataclass
class HTTPSource:
"""Class represents HTTP data source."""

http_etag: str = field(default_factory=str)
http_last_modified: str = field(default_factory=str)
last_used_ts: int = 0
url: str = field(default_factory=str)

def extract_caching_headers(self, headers: dict):
"""Extract cache related headers from given dict."""
self.http_etag = ""
self.http_last_modified = ""
for key, value in headers.items():
key = key.lower()
if key == "etag":
self.http_etag = value
elif key == "last-modified":
self.http_last_modified = value

def make_caching_headers(self) -> Dict[str, str]:
"""Return cache related headers as a dict."""
headers = {}
if self.http_etag:
headers["if-none-match"] = self.http_etag

if self.http_last_modified:
headers["if-modified-since"] = self.http_last_modified

return headers


@dataclass
class CachedData:
"""CachedData represents locally cached data and state."""

data_sources: dict = field(default_factory=dict)
items: dict = field(default_factory=dict)

def get_source_by_url(self, url: str):
"""Return source by URL.
If source doesn't exist, it will be created.
"""
source = self.data_sources.get(url, None)
if source:
source.last_used_ts = int(time.time())
return source

self.data_sources[url] = HTTPSource(
last_used_ts=int(time.time()), url=url
)
return self.get_source_by_url(url)

def scrub_data_sources(self, expiration: int = DATA_SOURCE_EXPIRATION):
"""Delete expired data sources."""
now = int(time.time())
for key in list(self.data_sources.keys()):
diff = now - self.data_sources[key].last_used_ts
if int(diff) > expiration:
self.data_sources.pop(key)


def format_message(
url: str, msg_attrs: Tuple[str, str], handle: str = ""
Expand All @@ -53,18 +110,24 @@ def format_message(


def get_rss(
logger: logging.Logger, url: str, timeout: int = HTTP_TIMEOUT
) -> str:
logger: logging.Logger,
url: str,
timeout: int = HTTP_TIMEOUT,
extra_headers: Dict = None,
) -> requests.models.Response:
"""Return body of given URL as a string."""
# Randomize user agent, because CF likes to block for no apparent reason.
logger.debug("Get %s", url)
user_agent = "rss2irc_{:d}".format(int(time.time()))
rsp = requests.get(url, timeout=timeout, headers={"User-Agent": user_agent})
headers = {"User-Agent": user_agent}
if extra_headers:
for key, value in extra_headers.items():
headers[key] = value

logger.debug("Get %s", url)
rsp = requests.get(url, timeout=timeout, headers=headers)
logger.debug("Got HTTP Status Code: %i", rsp.status_code)
rsp.raise_for_status()
data = rsp.text
del rsp
return data
return rsp


def main():
Expand All @@ -84,32 +147,39 @@ def main():
sys.exit(1)

try:
data = get_rss(logger, args.rss_url, args.rss_http_timeout)
if not data:
cache = read_cache(logger, args.cache)
source = cache.get_source_by_url(args.rss_url)

rsp = get_rss(
logger,
args.rss_url,
args.rss_http_timeout,
source.make_caching_headers(),
)
if rsp.status_code == 304:
logger.debug("No new RSS data since the last run")
write_cache(cache, args.cache)
sys.exit(0)

if not rsp.text:
logger.error("Failed to get RSS from %s", args.rss_url)
sys.exit(1)

news = parse_news(data)
news = parse_news(rsp.text)
if not news:
logger.info("No news?")
write_cache(cache, args.cache)
sys.exit(0)

cache = read_cache(logger, args.cache)
scrub_cache(logger, cache)

for key in list(news.keys()):
if key in cache.items:
logger.debug("Key %s found in cache", key)
cache.items[key] = int(time.time()) + args.cache_expiration
news.pop(key)
source.extract_caching_headers(rsp.headers)
scrub_items(logger, cache)
prune_news(logger, cache, news, args.cache_expiration)

if not args.cache_init:
write_data(logger, news, args.output, args.handle, args.sleep)

expiration = int(time.time()) + args.cache_expiration
for key in list(news.keys()):
cache.items[key] = expiration

update_items_expiration(cache, news, args.cache_expiration)
cache.scrub_data_sources()
write_cache(cache, args.cache)
# TODO(zstyblik): remove error file
except Exception:
Expand Down Expand Up @@ -171,7 +241,7 @@ def parse_args() -> argparse.Namespace:
"--cache-expiration",
dest="cache_expiration",
type=int,
default=EXPIRATION,
default=CACHE_EXPIRATION,
help="Time, in seconds, for how long to keep items in cache.",
)
parser.add_argument(
Expand Down Expand Up @@ -210,30 +280,51 @@ def parse_news(data: str) -> Dict[str, Tuple[str, str]]:
return news


def prune_news(
logger: logging.Logger,
cache: CachedData,
news: Dict[str, Tuple[str, str]],
expiration: int = CACHE_EXPIRATION,
) -> None:
"""Prune news which already are in cache."""
item_expiration = int(time.time()) + expiration
for key in list(news.keys()):
if key in cache.items:
logger.debug("Key %s found in cache", key)
cache.items[key] = item_expiration
news.pop(key)


def read_cache(logger: logging.Logger, cache_file: str) -> CachedData:
"""Read file with Py pickle in it."""
if not cache_file:
return CachedData()

if not os.path.exists(cache_file):
logger.warning("Cache file '%s' doesn't exist.", cache_file)
return CachedData()

with open(cache_file, "rb") as fhandle:
try:
try:
with open(cache_file, "rb") as fhandle:
cache = pickle.load(fhandle)
except EOFError:
# Note: occurred with empty file.
cache = CachedData()
logger.debug(
"Cache file is probably empty: %s", traceback.format_exc()
)
except FileNotFoundError:
cache = CachedData()
logger.warning("Cache file '%s' doesn't exist.", cache_file)
except EOFError:
# Note: occurred with empty file.
cache = CachedData()
logger.debug(
"Cache file '%s' is probably empty: %s",
cache_file,
traceback.format_exc(),
)

logger.debug(cache)
return cache


def scrub_cache(logger: logging.Logger, cache: CachedData) -> None:
def signal_handler(signum, frame):
"""Handle SIGALRM signal."""
raise ValueError


def scrub_items(logger: logging.Logger, cache: CachedData) -> None:
"""Scrub cache and remove expired items."""
time_now = time.time()
for key in list(cache.items.keys()):
Expand All @@ -252,9 +343,15 @@ def scrub_cache(logger: logging.Logger, cache: CachedData) -> None:
cache.items.pop(key)


def signal_handler(signum, frame):
"""Handle SIGALRM signal."""
raise ValueError
def update_items_expiration(
cache: CachedData,
news: Dict[str, Tuple[str, str]],
expiration: int = CACHE_EXPIRATION,
):
"""FIXME: DESC INCORR ... Update expiration of items in cache."""
item_expiration = int(time.time()) + expiration
for key in list(news.keys()):
cache.items[key] = item_expiration


def write_cache(data: CachedData, cache_file: str) -> None:
Expand Down
Loading

0 comments on commit 9f9f361

Please sign in to comment.