Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: checklink #100

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/redturtle/volto/browser/fix_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def check_broken_blocks_links(self, blocks, field, context):
if self.check_pattern(value=data):
res["ok"] = False
res["fixed"] = True
# convert broken links to current site ulr and then deserialize all
# convert broken links to current site url and then deserialize all
# blocks to have the right values
for url in self.request.form.get("to_replace", "").split():
data = data.replace(url, self.portal_url)
Expand Down
242 changes: 242 additions & 0 deletions src/redturtle/volto/linkchecker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
from AccessControl import ClassSecurityInfo
from Acquisition import aq_base
from datetime import datetime
from OFS.SimpleItem import SimpleItem
from persistent.mapping import PersistentMapping
from plone import api
from plone.dexterity.utils import iterSchemata
from plone.memoize import view
from plone.restapi.interfaces import IFieldSerializer
from plone.restapi.serializer.utils import RESOLVEUID_RE
from Products.CMFCore.utils import UniqueObject
from zExceptions import NotFound
from zope.component import queryMultiAdapter
from zope.interface import implementer
from zope.interface import Interface
from zope.schema import getFieldsInOrder

import json
import logging
import re
import requests


logger = logging.getLogger(__name__)


class ILinkCheckerTool(Interface):
"""Link checker tool"""

def clear():
"""Clear the status"""

def check_site(ttl=3600 * 6):
"""Check the site for links"""

def check_content(content, ttl=3600 * 6):
"""Check the content for links"""


URL_REGEX = re.compile(
r"((?:(?:https?://)(?:(?:www\.)?(?:[\da-z\.-]+)\.(?:[a-z]{2,6}))(?::[0-9]{1,4})?)(?:[^\"\s]*)?)"
)
RESOLVEUID_REGEX = re.compile(r"^(?:|.*/)(resolve[Uu]id/[a-z0-9\-]+)")


@implementer(ILinkCheckerTool)
class LinkCheckerTool(UniqueObject, SimpleItem):
"""Tool to check links in the site"""

id = "portal_linkchecker"
meta_type = "Link Checker Tool"
security = ClassSecurityInfo()

def __init__(self):
# link -> (last_update, status_code)
self._external_links_status = PersistentMapping()
# last update
self._last_update = None
# UUID -> (last_update, [(link1, status1), (link2, status2), ...])
self._outgoing_links = PersistentMapping()

@property
def user_agent(self):
return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
# "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:123.0) Gecko/20100101 Firefox/123.0"

@property
def request(self):
# used for view.memoize
return api.env.getRequest()

@property
@view.memoize
def portal_url(self):
return api.portal.get().absolute_url()

def clear(self):
"""Clear the status"""
self._external_links_status.clear()
self._outgoing_links.clear()
self._last_update = None

def check_site(self, ttl=3600 * 6):
"""Check the site for links

:param ttl: the time to live for the external links status
"""

def check(obj, path, **kwargs):
logger.debug("Checking %s", path)
self.check_content(obj, ttl)

portal = api.portal.get()
catalog = api.portal.get_tool("portal_catalog")
catalog.ZopeFindAndApply(
portal,
search_sub=True,
apply_func=check,
apply_path="/".join(portal.getPhysicalPath()),
)
self._last_update = datetime.now()

def check_content(self, content, ttl=3600 * 6):
"""Check the content for links

:param content: the content to check
:param ttl: the time to live for the external links status
"""
uuid = content.UID()
internal_links = []
external_links = []
for link in self._find_links(content):
if self._is_internal(link):
status = self._check_internal_link(link)
internal_links.append((link, status))
else:
status = self._check_external_link(link, ttl)
external_links.append((link, status))
self._outgoing_links[uuid] = (datetime.now(), external_links + internal_links)

def get_page_with_broken_links(self):
"""
:return: iterator with items as
(uid, [(link1, status1), link2, status2), ...])
"""
for uid, (_, links) in self._outgoing_links.items():
broken_links = [item for item in links if item[1] != 200]
if broken_links:
yield (uid, broken_links)

def get_rows(self, broken=True):
"""
example usage:

tool = api.portal.get_tool("portal_linkchecker")
writer = csv.writer(open("broken_links.csv", "w"))
for row in tool.get_rows():
writer.writerow(row)
"""
yield ["PAGE", "LINK", "STATUS"]
for uid, (_, links) in self._outgoing_links.items():
page = api.content.find(UID=uid, unrestricted=True)[0].getURL()
for item in links:
if broken and item[1] == 200:
continue
yield [page, item[0], item[1]]

def _find_links(self, item):
"""Find links in the content

:param item: the content to check

n.b. this is (initially) a copy of the method from redturtle.volto.browser.fix_links
"""
links = set()
obj = aq_base(item)
if not hasattr(obj, "portal_type"):
# skip Plone Site
return []
for schemata in iterSchemata(obj):
for name, field in getFieldsInOrder(schemata):
if name in ["blocks_layout"]:
continue
serializer = queryMultiAdapter(
(field, item, self.request), IFieldSerializer
)
# value = getattr(obj, name, None)
value = serializer()
if not value:
continue
elif isinstance(value, str):
links.update(re.findall(URL_REGEX, value))
links.update(re.findall(RESOLVEUID_REGEX, value))
elif isinstance(value, dict):
links.update(re.findall(URL_REGEX, json.dumps(value)))
links.update(re.findall(RESOLVEUID_REGEX, json.dumps(value)))
else:
# import pdb; pdb.set_trace()
continue
return list(links)

def _is_internal(self, link):
"""Check if the link is internal"""
if link.startswith(self.portal_url):
return True
if link.startswith("http://") or link.startswith("https://"):
return False
return True

@view.memoize
def _check_internal_link(self, link):
"""Check the internal link"""
logger.info("Checking internal link %s", link)
link = link.replace(self.portal_url, "")
# TODO: use subrequest or catalog ?
match = RESOLVEUID_RE.match(link)
if match is not None:
uid, _ = match.groups()
if api.content.find(UID=uid, unrestricted=True):
return 200
else:
return 404
else:
try:
# XXX: path is not link ....
if api.content.get(path=link):
return 200
except (NotFound, IndexError):
return 404

# TODO: redirection tool

# try:
# from plone.subrequest import subrequest
# def exception_handler(response, exc):
# raise exc
# subrequest(link, exception_handler=exception_handler)
# return 200
# except NotFound:
# return 404
# except:
# raise

return 404

@view.memoize
def _check_external_link(self, link, ttl=3600 * 6, timeout=1.0):
"""Check the external link"""
logger.warning("Checking external link %s", link)

if link in self._external_links_status:
last_update, status = self._external_links_status[link]
if status == 200 and (datetime.now() - last_update).total_seconds() < ttl:
return status
try:
headers = {"User-Agent": self.user_agent}
res = requests.get(link, headers=headers, timeout=timeout)
status = res.status_code
except requests.exceptions.RequestException:
status = 999 # ???
self._external_links_status[link] = (datetime.now(), status)
return status
6 changes: 6 additions & 0 deletions src/redturtle/volto/profiles/default/toolset.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<tool-setup>
<required class="redturtle.volto.linkchecker.LinkCheckerTool"
tool_id="portal_linkchecker"
/>
</tool-setup>
4 changes: 4 additions & 0 deletions src/redturtle/volto/profiles/uninstall/toolset.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<tool-setup>
<forbidden tool_id="portal_linkchecker" />
</tool-setup>
93 changes: 93 additions & 0 deletions src/redturtle/volto/tests/test_linkchecker_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
from plone import api
from plone.app.testing import setRoles
from plone.app.testing import SITE_OWNER_NAME
from plone.app.testing import SITE_OWNER_PASSWORD
from plone.app.testing import TEST_USER_ID
from plone.restapi.testing import RelativeSession
from redturtle.volto.testing import REDTURTLE_VOLTO_API_FUNCTIONAL_TESTING
from uuid import uuid4

import unittest


class LinkCheckerToolTest(unittest.TestCase):
layer = REDTURTLE_VOLTO_API_FUNCTIONAL_TESTING

def setUp(self):
self.app = self.layer["app"]
self.portal = self.layer["portal"]
self.portal_url = self.portal.absolute_url()

self.api_session = RelativeSession(self.portal_url)
self.api_session.headers.update({"Accept": "application/json"})
self.api_session.auth = (SITE_OWNER_NAME, SITE_OWNER_PASSWORD)

setRoles(self.portal, TEST_USER_ID, ["Manager"])

api.content.create(container=self.portal, type="Folder", title="Foo folder")

self.document = api.content.create(
container=self.portal, type="Document", title="Foo document"
)

self.document.blocks = {
"xyz": {
"@type": "testo_riquadro_immagine",
"image_card_title": {"blocks": [{"text": "imagetitle"}]},
"image_card_content": {"blocks": [{"text": "imagetext"}]},
"text": {
"blocks": [
{
"urls": [
"https://httpstat.us/404", # broken
f"/resolveuid/{str(uuid4())}", # broken
f"/resolveuid/{self.document.UID()}", # internal ok
]
}
]
},
},
}
self.document.blocks_layout = {"items": ["xyz"]}

api.content.create(container=self.portal, type="Event", title="Foo event")

self.link_524 = api.content.create(
container=self.portal,
type="Link",
title="Foo link 524",
remoteUrl="https://httpstat.us/524",
)
self.link_404 = api.content.create(
container=self.portal,
type="Link",
title="Foo link 404",
remoteUrl="https://httpstat.us/404",
)

self.tool = api.portal.get_tool("portal_linkchecker")

def test_tool_exists(self):
self.assertTrue(self.tool)

def test_clear(self):
self.tool.clear()
self.assertEqual(len(self.tool._external_links_status), 0)
self.assertEqual(len(self.tool._outgoing_links), 0)
self.assertEqual(self.tool._last_update, None)

def test_check_site(self):
self.tool.check_site()
self.assertNotEqual(self.tool._last_update, None)
self.assertEqual(len(self.tool._outgoing_links), 6)
self.assertEqual(len(self.tool._external_links_status), 2)

broken_links = {
uid: links for (uid, links) in self.tool.get_page_with_broken_links()
}
self.assertEqual(len(broken_links), 3)
self.assertIn(self.link_524.UID(), broken_links)
self.assertIn(self.link_404.UID(), broken_links)
self.assertIn(self.document.UID(), broken_links)
self.assertEqual(len(broken_links[self.document.UID()]), 2)
Loading