Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed vulnerability by Aljohara #36078

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions cms/djangoapps/api/v1/serializers/course_runs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
""" Course run serializers. """
import logging
import magic
from rest_framework import serializers


from django.contrib.auth import get_user_model
from django.db import transaction
Expand Down Expand Up @@ -82,10 +85,28 @@ def update_team(self, instance, team): # lint-amnesty, pylint: disable=missing-


def image_is_jpeg_or_png(value):
content_type = value.content_type
if content_type not in list(IMAGE_TYPES.keys()): # lint-amnesty, pylint: disable=consider-iterating-dictionary
# Use python-magic to detect the actual MIME type based on file content
mime = magic.Magic(mime=True)
content_type = mime.from_buffer(value.read(1024)) # Read the first 1024 bytes to determine MIME type
value.seek(0) # Reset the file pointer after reading

# Allowed MIME types for images
allowed_mime_types = ['image/jpeg', 'image/png']

# Validate the content type by checking the MIME type
if content_type not in allowed_mime_types:
raise serializers.ValidationError(
f'Only JPEG and PNG image types are supported. {content_type} is not valid')
f'Only JPEG and PNG image types are supported. {content_type} is not valid.')

# Optional: Validate the file extension if needed
# Note: This step is extra security to ensure the file extension matches the content
file_extension = value.name.split('.')[-1].lower()
if content_type == 'image/jpeg' and file_extension not in ['jpg', 'jpeg']:
raise serializers.ValidationError('File extension does not match MIME type for JPEG.')
elif content_type == 'image/png' and file_extension != 'png':
raise serializers.ValidationError('File extension does not match MIME type for PNG.')

# If it passes both the MIME type and extension checks, the file is valid


class CourseRunImageField(serializers.ImageField): # lint-amnesty, pylint: disable=missing-class-docstring
Expand Down
35 changes: 30 additions & 5 deletions cms/djangoapps/contentstore/asset_storage_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import logging
import math
import re
import magic
from django.http import HttpResponse
from django.core.exceptions import ValidationError
from functools import partial
from urllib.parse import urljoin


from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.core.exceptions import PermissionDenied
Expand Down Expand Up @@ -558,18 +562,39 @@ def _get_error_if_course_does_not_exist(course_key): # lint-amnesty, pylint: di


def _get_file_metadata_as_dictionary(upload_file): # lint-amnesty, pylint: disable=missing-function-docstring
# compute a 'filename' which is similar to the location formatting; we're
# using the 'filename' nomenclature since we're using a FileSystem paradigm
# here; we're just imposing the Location string formatting expectations to
# keep things a bit more consistent
'''
This function retrieves metadata about the uploaded file and validates
the file type to prevent insecure files (e.g., HTML) from being processed.
'''
# Use python-magic to get the MIME type of the uploaded file
mime = magic.Magic(mime=True)
mime_type = mime.from_buffer(upload_file.read(1024)) # Read first 1024 bytes to detect MIME type
upload_file.seek(0) # Reset file pointer after reading

# Allowed MIME types (for security, restrict file types)
allowed_mime_types = ['image/jpeg', 'image/png', 'application/pdf', 'application/zip'] # add any other files type you want

# If the uploaded file's MIME type is HTML, treat it as a dangerous file
if mime_type == 'text/html':
# Force the file to be downloaded by setting 'Content-Disposition'
response = HttpResponse(upload_file, content_type='application/octet-stream')
response['Content-Disposition'] = f'attachment; filename={upload_file.name}'
return response

# Validate the file's MIME type
if mime_type not in allowed_mime_types:
raise ValidationError(f"Invalid file type: {mime_type}. Only image, PDF, and zip files are allowed.")

# Return file metadata
return {
'upload_file': upload_file,
'filename': upload_file.name,
'mime_type': upload_file.content_type,
'mime_type': mime_type,
'upload_file_size': get_file_size(upload_file)
}



def get_file_size(upload_file):
"""returns the size of the uploaded file"""
# can be used for mocking test file sizes.
Expand Down
36 changes: 27 additions & 9 deletions cms/djangoapps/contentstore/course_info_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import logging
import re
import bleach

from django.http import HttpResponseBadRequest
from django.utils.translation import gettext as _
Expand Down Expand Up @@ -43,14 +44,28 @@ def get_course_updates(location, provided_id, user_id):
return _get_visible_update(course_update_items)


# Define allowed HTML tags and attributes
ALLOWED_TAGS = ['b', 'i', 'u', 'em', 'strong', 'p', 'br', 'ul', 'ol', 'li', 'a', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'blockquote']
ALLOWED_ATTRIBUTES = {
'*': ['class', 'id'], # Allow global attributes like class and id
'a': ['href', 'title'], # Allow 'href' and 'title' attributes for <a> tags
}

def sanitize_html(content):
"""
Sanitize the HTML content to remove dangerous tags and attributes.
Only safe HTML tags and attributes will be allowed.
"""
return bleach.clean(content, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES, strip=True)

def update_course_updates(location, update, passed_id=None, user=None, request_method=None):
"""
Either add or update the given course update.
Add:
If the passed_id is absent or None, the course update is added.
Update:
It will update it if it has a passed_id which has a valid value.
Until updates have distinct values, the passed_id is the location url + an index into the html structure.
Until updates have distinct values, the passed_id is the location URL + an index into the HTML structure.
"""
try:
course_updates = modulestore().get_item(location)
Expand All @@ -60,45 +75,48 @@ def update_course_updates(location, update, passed_id=None, user=None, request_m
course_update_items = list(reversed(get_course_update_items(course_updates)))
course_update_dict = None

# Sanitize the content input to prevent XSS
sanitized_content = sanitize_html(update["content"])

if passed_id is not None:
passed_index = _get_index(passed_id)

# if passed_index in course_update_items_ids:
for course_update_item in course_update_items:
if course_update_item["id"] == passed_index:
course_update_dict = course_update_item
course_update_item["date"] = update["date"]
course_update_item["content"] = update["content"]
course_update_item["content"] = sanitized_content # Use sanitized content
break

if course_update_dict is None:
return HttpResponseBadRequest(_("Invalid course update id."))
else:
course_update_items_ids = [course_update_item['id'] for course_update_item in course_update_items]

course_update_dict = {
# if no course updates then the id will be 1 otherwise maxid + 1
"id": max(course_update_items_ids) + 1 if course_update_items_ids else 1,
"date": update["date"],
"content": update["content"],
"content": sanitized_content, # Use sanitized content
"status": CourseInfoBlock.STATUS_VISIBLE
}
course_update_items.append(course_update_dict)

# update db record
# Update database record
save_course_update_items(location, course_updates, course_update_items, user)

if request_method == "POST":
# track course update event
# Track course update event
track_course_update_event(location.course_key, user, course_update_dict)

# send course update notification
# Send course update notification
send_course_update_notification(
location.course_key, course_update_dict["content"], user,
)

# remove status key
# Remove status key
if "status" in course_update_dict:
del course_update_dict["status"]

return course_update_dict


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""
import logging
from datetime import datetime
import bleach

from attrs import asdict
from django.conf import settings
Expand Down Expand Up @@ -529,13 +530,27 @@ def create_item(request):

@login_required
@expect_json
def sanitize_html(user_input):
"""Sanitize HTML input to allow only safe tags and attributes."""
allowed_tags = ['p', 'b', 'i', 'u', 'em', 'strong', 'a']
allowed_attributes = {
'*': ['class', 'id'], # Allow class and id attributes on all elements
'a': ['href', 'title'], # Allow href and title attributes on <a> tags
}

# Sanitize the HTML input to allow only the specified tags and attributes
sanitized_input = bleach.clean(user_input, tags=allowed_tags, attributes=allowed_attributes)

return sanitized_input

def _create_block(request):
"""View for create blocks."""
parent_locator = request.json["parent_locator"]
usage_key = usage_key_with_run(parent_locator)
if not has_studio_write_access(request.user, usage_key.course_key):
raise PermissionDenied()

# Handle clipboard-based content staging
if request.json.get("staged_content") == "clipboard":
# Paste from the user's clipboard (content_staging app clipboard, not browser clipboard) into 'usage_key':
try:
Expand All @@ -562,6 +577,8 @@ def _create_block(request):
})

category = request.json["category"]

# Check if the component is part of a library
if isinstance(usage_key, LibraryUsageLocator):
# Only these categories are supported at this time.
if category not in ["html", "problem", "video"]:
Expand All @@ -580,23 +597,33 @@ def _create_block(request):
status=400,
)

# Get the display_name and boilerplate fields
display_name = request.json.get("display_name", "")
boilerplate = request.json.get("boilerplate", "")

# Apply HTML sanitization only for the "html" component type
if category == "html":
display_name = sanitize_html(display_name)
boilerplate = sanitize_html(boilerplate)

# Create the block using sanitized or raw content
created_block = create_xblock(
parent_locator=parent_locator,
user=request.user,
category=category,
display_name=request.json.get("display_name"),
boilerplate=request.json.get("boilerplate"),
display_name=display_name,
boilerplate=boilerplate,
)

response = {
"locator": str(created_block.location),
"courseKey": str(created_block.location.course_key),
}
# If it contains library_content_key, the block is being imported from a v2 library
# so it needs to be synced with upstream block.

# If it contains library_content_key, sync the block with the upstream block.
if upstream_ref := request.json.get("library_content_key"):
try:
# Set `created_block.upstream` and then sync this with the upstream (library) version.
# Set `created_block.upstream` and then sync with the upstream (library) version.
created_block.upstream = upstream_ref
sync_from_upstream(downstream=created_block, user=request.user)
except BadUpstream as exc:
Expand All @@ -606,12 +633,15 @@ def _create_block(request):
f"using provided library_content_key='{upstream_ref}'"
)
return JsonResponse({"error": str(exc)}, status=400)

# Update the item in the store
modulestore().update_item(created_block, request.user.id)
response['upstreamRef'] = upstream_ref

return JsonResponse(response)



def _get_source_index(source_usage_key, source_parent):
"""
Get source index position of the XBlock.
Expand Down