diff --git a/contentcuration/automation/migrations/0001_initial.py b/contentcuration/automation/migrations/0001_initial.py new file mode 100644 index 0000000000..6152c0f4b2 --- /dev/null +++ b/contentcuration/automation/migrations/0001_initial.py @@ -0,0 +1,44 @@ +# Generated by Django 3.2.24 on 2024-08-05 21:23 +import uuid + +import django.db.models.deletion +from django.db import migrations +from django.db import models + + +class Migration(migrations.Migration): + initial = True + + dependencies = [ + ('kolibri_public', '0003_alter_file_preset'), + ] + + operations = [ + migrations.CreateModel( + name='RecommendationsCache', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, + serialize=False)), + ('request_hash', models.CharField(max_length=32, null=True)), + ('rank', models.FloatField(default=0.0, null=True)), + ('override_threshold', models.BooleanField(default=False)), + ('timestamp', models.DateTimeField(auto_now_add=True)), + ('contentnode', models.ForeignKey(blank=True, null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name='recommendations', + to='kolibri_public.contentnode')), + ], + ), + migrations.AddIndex( + model_name='recommendationscache', + index=models.Index(fields=['request_hash'], name='request_hash_idx'), + ), + migrations.AddIndex( + model_name='recommendationscache', + index=models.Index(fields=['contentnode'], name='contentnode_idx'), + ), + migrations.AlterUniqueTogether( + name='recommendationscache', + unique_together={('request_hash', 'contentnode')}, + ), + ] diff --git a/contentcuration/automation/models.py b/contentcuration/automation/models.py index 0b4331b362..7000ddf420 100644 --- a/contentcuration/automation/models.py +++ b/contentcuration/automation/models.py @@ -1,3 +1,30 @@ -# from django.db import models +import uuid -# Create your models here. +from django.db import models +from kolibri_public.models import ContentNode + + +REQUEST_HASH_INDEX_NAME = "request_hash_idx" +CONTENTNODE_INDEX_NAME = "contentnode_idx" + + +class RecommendationsCache(models.Model): + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + request_hash = models.CharField(max_length=32, null=True) + contentnode = models.ForeignKey( + ContentNode, + null=True, + blank=True, + related_name='recommendations', + on_delete=models.CASCADE, + ) + rank = models.FloatField(default=0.0, null=True) + override_threshold = models.BooleanField(default=False) + timestamp = models.DateTimeField(auto_now_add=True) + + class Meta: + unique_together = ('request_hash', 'contentnode') + indexes = [ + models.Index(fields=['request_hash'], name=REQUEST_HASH_INDEX_NAME), + models.Index(fields=['contentnode'], name=CONTENTNODE_INDEX_NAME), + ] diff --git a/contentcuration/automation/tests/appnexus/test_base.py b/contentcuration/automation/tests/appnexus/test_base.py index b06cc213b5..3fa3fa8e74 100644 --- a/contentcuration/automation/tests/appnexus/test_base.py +++ b/contentcuration/automation/tests/appnexus/test_base.py @@ -1,69 +1,26 @@ import time -import pytest -import requests from unittest.mock import patch -from automation.utils.appnexus.base import Adapter +import mock +import pytest +import requests from automation.utils.appnexus.base import Backend from automation.utils.appnexus.base import BackendRequest +from automation.utils.appnexus.base import BackendResponse from automation.utils.appnexus.base import SessionWithMaxConnectionAge from automation.utils.appnexus.errors import ConnectionError +from automation.utils.appnexus.errors import InvalidResponse -class MockBackend(Backend): - base_url = 'https://kolibri-dev.learningequality.org' - connect_endpoint = '/status' - def connect(self) -> None: - return super().connect() - - def make_request(self, request): - return super().make_request(request) - -class ErrorBackend(Backend): - base_url = 'https://bad-url.com' - connect_endpoint = '/status' - def connect(self) -> None: - return super().connect() - - def make_request(self, request): - return super().make_request(request) - - -class MockAdapter(Adapter): - def mockoperation(self): - pass - - -def test_backend_singleton(): - b1, b2 = MockBackend(), MockBackend() - assert id(b1) == id(b2) - - -def test_adapter_creation(): - a = MockAdapter(backend=MockBackend) - assert isinstance(a, Adapter) - - -def test_adapter_backend_default(): - b = MockBackend() - adapter = Adapter(backend=b) - assert isinstance(adapter.backend, Backend) - - -def test_adapter_backend_custom(): - b = MockBackend() - a = Adapter(backend=b) - assert a.backend is b - def test_session_with_max_connection_age_request(): with patch.object(requests.Session, 'request') as mock_request: session = SessionWithMaxConnectionAge() session.request('GET', 'https://example.com') assert mock_request.call_count == 1 + def test_session_with_max_connection_age_not_closing_connections(): - with patch.object(requests.Session, 'close') as mock_close,\ - patch.object(requests.Session, 'request') as mock_request: + with patch.object(requests.Session, 'close') as mock_close, patch.object(requests.Session, 'request') as mock_request: session = SessionWithMaxConnectionAge(60) session.request('GET', 'https://example.com') time.sleep(0.1) @@ -72,9 +29,9 @@ def test_session_with_max_connection_age_not_closing_connections(): assert mock_close.call_count == 0 assert mock_request.call_count == 2 + def test_session_with_max_connection_age_closing_connections(): - with patch.object(requests.Session, 'close') as mock_close,\ - patch.object(requests.Session, 'request') as mock_request: + with patch.object(requests.Session, 'close') as mock_close, patch.object(requests.Session, 'request') as mock_request: session = SessionWithMaxConnectionAge(1) session.request('GET', 'https://example.com') time.sleep(2) @@ -83,33 +40,55 @@ def test_session_with_max_connection_age_closing_connections(): assert mock_close.call_count == 1 assert mock_request.call_count == 2 -def test_backend_connect(): - backend = MockBackend() - connected = backend.connect() - assert connected is True +@mock.patch("automation.utils.appnexus.base.Backend.connect") +def test_backend_connect(mock_connect): + mock_connect.return_value = True + + backend = Backend() + result = backend.connect() + + mock_connect.assert_called_once() + assert result is True -def test_backend_connect_error(): - backend = ErrorBackend() - connected = backend.connect() - assert connected is False +@mock.patch("automation.utils.appnexus.base.Backend.connect") +def test_backend_connect_error(mock_connect): + mock_connect.side_effect = [ConnectionError("Failed to connect"), False] -def test_backend_request(): - request = BackendRequest('GET', '/api/public/info') + backend = Backend() - backend = MockBackend() + with pytest.raises(ConnectionError) as exc_info: + backend.connect() + assert str(exc_info.value) == "Failed to connect" + + result = backend.connect() + assert result is False + + assert mock_connect.call_count == 2 + + +@mock.patch("automation.utils.appnexus.base.Backend.make_request") +def test_backend_request(mock_make_request): + mock_response = BackendResponse(data=[{"key": "value"}]) + mock_make_request.return_value = mock_response + + backend = Backend() + request = BackendRequest(method="GET", path="/api/test") response = backend.make_request(request) - assert response.status_code == 200 - assert len(response.__dict__) > 0 + assert response == mock_response + mock_make_request.assert_called_once_with(request) + -def test_backend_request_error(): - request = BackendRequest('GET', '/api/public/info') +@mock.patch("automation.utils.appnexus.base.Backend.make_request") +def test_backend_request_error(mock_make_request): + mock_make_request.side_effect = InvalidResponse("Request failed") - backend = ErrorBackend() + backend = Backend() + request = BackendRequest(method="GET", path="/api/test") - with pytest.raises(ConnectionError) as error: + with pytest.raises(InvalidResponse) as exc_info: backend.make_request(request) - - assert "Unable to connect to" in str(error.value) + assert str(exc_info.value) == "Request failed" + mock_make_request.assert_called_once_with(request) diff --git a/contentcuration/automation/tests/test_recommendations_cache_model.py b/contentcuration/automation/tests/test_recommendations_cache_model.py new file mode 100644 index 0000000000..9cb2e96c9a --- /dev/null +++ b/contentcuration/automation/tests/test_recommendations_cache_model.py @@ -0,0 +1,64 @@ +import uuid + +from automation.models import RecommendationsCache +from django.db import IntegrityError +from kolibri_public.models import ContentNode + +from contentcuration.tests.base import StudioTestCase + + +class TestRecommendationsCache(StudioTestCase): + + def setUp(self): + self.content_node = ContentNode.objects.create( + id=uuid.uuid4(), + title='Test Content Node', + content_id=uuid.uuid4(), + channel_id=uuid.uuid4(), + ) + self.cache = RecommendationsCache.objects.create( + request_hash='test_hash', + contentnode=self.content_node, + rank=1.0, + override_threshold=False + ) + + def test_cache_creation(self): + self.assertIsInstance(self.cache, RecommendationsCache) + self.assertEqual(self.cache.request_hash, 'test_hash') + self.assertEqual(self.cache.contentnode, self.content_node) + self.assertEqual(self.cache.rank, 1.0) + self.assertFalse(self.cache.override_threshold) + + def test_cache_retrieval(self): + retrieved_cache = RecommendationsCache.objects.get(request_hash='test_hash') + self.assertEqual(retrieved_cache, self.cache) + + def test_cache_uniqueness(self): + with self.assertRaises(IntegrityError): + RecommendationsCache.objects.create( + request_hash='test_hash', + contentnode=self.content_node, + rank=2.0, + override_threshold=True + ) + + def test_bulk_create_ignore_conflicts_true(self): + initial_count = RecommendationsCache.objects.count() + try: + RecommendationsCache.objects.bulk_create( + [self.cache, self.cache], + ignore_conflicts=True + ) + except IntegrityError: + self.fail("bulk_create raised IntegrityError unexpectedly!") + + final_count = RecommendationsCache.objects.count() + self.assertEqual(initial_count, final_count) + + def test_bulk_create_ignore_conflicts_false(self): + with self.assertRaises(IntegrityError): + RecommendationsCache.objects.bulk_create( + [self.cache, self.cache], + ignore_conflicts=False + ) diff --git a/contentcuration/automation/utils/appnexus/base.py b/contentcuration/automation/utils/appnexus/base.py index 0998753bd8..626bca1d58 100644 --- a/contentcuration/automation/utils/appnexus/base.py +++ b/contentcuration/automation/utils/appnexus/base.py @@ -56,8 +56,7 @@ def __init__( class BackendResponse(object): """ Class that should be inherited by specific backend for its responses""" - def __init__(self, error=None, **kwargs): - self.error = error + def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) @@ -169,11 +168,10 @@ def connect(self, **kwargs): def make_request(self, request): """ Make a request to the backend service. """ - response = self._make_request(request) try: - info = response.json() - info.update({"status_code": response.status_code}) - return BackendResponse(**info) + response = self._make_request(request) + response_body = dict(data=response.json()) + return BackendResponse(**response_body) except ValueError as e: logging.exception(e) raise errors.InvalidResponse("Invalid response from backend") diff --git a/contentcuration/contentcuration/settings.py b/contentcuration/contentcuration/settings.py index 8d2d9d8832..d162c5c6de 100644 --- a/contentcuration/contentcuration/settings.py +++ b/contentcuration/contentcuration/settings.py @@ -89,6 +89,7 @@ 'django.contrib.postgres', 'django_celery_results', 'kolibri_public', + 'automation', ) SESSION_ENGINE = "django.contrib.sessions.backends.cached_db" diff --git a/contentcuration/contentcuration/tests/utils/test_automation_manager.py b/contentcuration/contentcuration/tests/utils/test_automation_manager.py index a01eaaa228..5f1833d6cf 100644 --- a/contentcuration/contentcuration/tests/utils/test_automation_manager.py +++ b/contentcuration/contentcuration/tests/utils/test_automation_manager.py @@ -1,5 +1,4 @@ import unittest -from unittest.mock import MagicMock from contentcuration.utils.automation_manager import AutomationManager @@ -11,30 +10,3 @@ def setUp(self): def test_creation(self): # Check if an instance of AutomationManager is created successfully self.assertIsInstance(self.automation_manager, AutomationManager) - - def test_generate_embedding(self): - text = "Some text that needs to be embedded" - # Mock the generate_embedding method of RecommendationsAdapter - # as the implementation is yet to be done - self.automation_manager.recommendations_backend_adapter.generate_embedding = MagicMock(return_value=[0.1, 0.2, 0.3]) - embedding_vector = self.automation_manager.generate_embedding(text) - self.assertIsNotNone(embedding_vector) - - def test_embedding_exists(self): - embedding_vector = [0.1, 0.2, 0.3] - # Currently no solid implementation exists for this - # So the embadding_exists function returns true anyways - exists = self.automation_manager.embedding_exists(embedding_vector) - self.assertTrue(exists) - - def test_load_recommendations(self): - embedding_vector = [0.1, 0.2, 0.3] - self.automation_manager.recommendations_backend_adapter.get_recommendations = MagicMock(return_value=["item1", "item2"]) - recommendations = self.automation_manager.load_recommendations(embedding_vector) - self.assertIsInstance(recommendations, list) - - def test_cache_embeddings(self): - embeddings_list = [[0.1, 0.2, 0.3]] - # Currently the function returns true anyways - success = self.automation_manager.cache_embeddings(embeddings_list) - self.assertTrue(success) diff --git a/contentcuration/contentcuration/tests/utils/test_recommendations.py b/contentcuration/contentcuration/tests/utils/test_recommendations.py index 3ef11e43e5..92c730cded 100644 --- a/contentcuration/contentcuration/tests/utils/test_recommendations.py +++ b/contentcuration/contentcuration/tests/utils/test_recommendations.py @@ -1,25 +1,37 @@ +import copy +import uuid + +from automation.models import RecommendationsCache from automation.utils.appnexus import errors +from automation.utils.appnexus.base import BackendResponse from django.test import TestCase +from kolibri_public.models import ContentNode as PublicContentNode from mock import MagicMock from mock import patch from contentcuration.models import ContentNode +from contentcuration.tests import testdata +from contentcuration.tests.base import StudioTestCase from contentcuration.utils.recommendations import EmbeddingsResponse +from contentcuration.utils.recommendations import EmbedTopicsRequest from contentcuration.utils.recommendations import Recommendations from contentcuration.utils.recommendations import RecommendationsAdapter +from contentcuration.utils.recommendations import RecommendationsResponse class RecommendationsTestCase(TestCase): def test_backend_initialization(self): - recomendations = Recommendations() - self.assertIsNotNone(recomendations) - self.assertIsInstance(recomendations, Recommendations) + recommendations = Recommendations() + self.assertIsNotNone(recommendations) + self.assertIsInstance(recommendations, Recommendations) + +class RecommendationsAdapterTestCase(StudioTestCase): -class RecommendationsAdapterTestCase(TestCase): - def setUp(self): - self.adapter = RecommendationsAdapter(MagicMock()) - self.topic = { + @classmethod + def setUpTestData(cls): + cls.adapter = RecommendationsAdapter(MagicMock()) + cls.topic = { 'id': 'topic_id', 'title': 'topic_title', 'description': 'topic_description', @@ -32,62 +44,272 @@ def setUp(self): } ] } - self.resources = [ - MagicMock(spec=ContentNode), - ] + cls.channel_id = 'test_channel_id' + cls.resources = [MagicMock(spec=ContentNode)] + + cls.request = EmbedTopicsRequest( + method='POST', + url='http://test.com', + path='/test/path', + params={'override_threshold': False}, + json=cls.topic + ) + cls.api_response = BackendResponse(data=[ + {'contentnode_id': '1234567890abcdef1234567890abcdef', 'rank': 0.9}, + {'contentnode_id': 'abcdef1234567890abcdef1234567890', 'rank': 0.8}, + {'contentnode_id': '00000000000000000000000000000003', 'rank': 0.8}, + {'contentnode_id': '00000000000000000000000000000005', 'rank': 0.8}, + ]) + + cls.public_content_node_1 = PublicContentNode.objects.create( + id=uuid.UUID('1234567890abcdef1234567890abcdef'), + title='Public Content Node 1', + content_id=uuid.uuid4(), + channel_id=uuid.UUID('ddec09d74e834241a580c480ee37879c'), + ) + cls.public_content_node_2 = PublicContentNode.objects.create( + id=uuid.UUID('abcdef1234567890abcdef1234567890'), + title='Public Content Node 2', + content_id=uuid.uuid4(), + channel_id=uuid.UUID('84fcaec1e0514b62899d7f436384c401'), + ) + + cls.cache = RecommendationsCache.objects.create( + request_hash=cls.adapter._generate_request_hash(cls.request), + contentnode=cls.public_content_node_1, + rank=1.0, + override_threshold=False + ) + + def assert_backend_call(self, mock_response_exists, response_exists_value, connect_value, + make_request_value, method, *args): + mock_response_exists.return_value = response_exists_value + self.adapter.backend.connect.return_value = connect_value + self.adapter.backend.make_request.return_value = make_request_value + + if response_exists_value: + result = method(*args) + mock_response_exists.assert_called_once() + self.adapter.backend.connect.assert_not_called() + self.adapter.backend.make_request.assert_not_called() + return result + else: + if connect_value: + result = method(*args) + self.adapter.backend.connect.assert_called_once() + self.adapter.backend.make_request.assert_called_once() + return result + else: + with self.assertRaises(errors.ConnectionError): + method(*args) + self.adapter.backend.connect.assert_called_once() + self.adapter.backend.make_request.assert_not_called() def test_adapter_initialization(self): self.assertIsNotNone(self.adapter) self.assertIsInstance(self.adapter, RecommendationsAdapter) - @patch('contentcuration.utils.recommendations.EmbedTopicsRequest') - def test_embed_topics_backend_connect_success(self, embed_topics_request_mock): - self.adapter.backend.connect.return_value = True - self.adapter.backend.make_request.return_value = MagicMock(spec=EmbeddingsResponse) - response = self.adapter.embed_topics(self.topic) - self.adapter.backend.connect.assert_called_once() - self.adapter.backend.make_request.assert_called_once() - self.assertIsInstance(response, EmbeddingsResponse) + @patch('contentcuration.utils.recommendations.RecommendationsAdapter.response_exists') + def test_generate_embeddings_connect_failure(self, mock_response_exists): + mock_response = MagicMock(spec=EmbeddingsResponse) + self.assert_backend_call(mock_response_exists, None, False, mock_response, + self.adapter.generate_embeddings, self.request) - def test_embed_topics_backend_connect_failure(self): - self.adapter.backend.connect.return_value = False - with self.assertRaises(errors.ConnectionError): - self.adapter.embed_topics(self.topic) - self.adapter.backend.connect.assert_called_once() - self.adapter.backend.make_request.assert_not_called() + @patch('contentcuration.utils.recommendations.RecommendationsAdapter.response_exists') + def test_generate_embeddings(self, mock_response_exists): + mock_response = MagicMock(spec=EmbeddingsResponse) + mock_response.error = None + response = self.assert_backend_call(mock_response_exists, None, True, mock_response, + self.adapter.generate_embeddings, self.request) + self.assertIsInstance(response, EmbeddingsResponse) - @patch('contentcuration.utils.recommendations.EmbedTopicsRequest') - def test_embed_topics_make_request_exception(self, embed_topics_request_mock): - self.adapter.backend.connect.return_value = True - self.adapter.backend.make_request.side_effect = Exception("Mocked exception") - response = self.adapter.embed_topics(self.topic) - self.adapter.backend.connect.assert_called_once() - self.adapter.backend.make_request.assert_called_once() + @patch('contentcuration.utils.recommendations.RecommendationsAdapter.response_exists') + def test_generate_embeddings_failure(self, mock_response_exists): + mock_response = MagicMock(spec=EmbeddingsResponse) + mock_response.error = {} + response = self.assert_backend_call(mock_response_exists, None, True, mock_response, + self.adapter.generate_embeddings, self.request) self.assertIsInstance(response, EmbeddingsResponse) - self.assertEqual(str(response.error), "Mocked exception") + self.assertIsNotNone(response.error) - @patch('contentcuration.utils.recommendations.EmbedContentRequest') - def test_embed_content_backend_connect_success(self, embed_content_request_mock): - self.adapter.backend.connect.return_value = True - self.adapter.backend.make_request.return_value = MagicMock(spec=EmbeddingsResponse) - response = self.adapter.embed_content(self.resources) - self.adapter.backend.connect.assert_called_once() - self.adapter.backend.make_request.assert_called_once() + def test_response_exists(self): + response = self.adapter.response_exists(self.request) + public_content_node_id = str(self.public_content_node_1.id).replace('-', '') + + self.assertIsNotNone(response) self.assertIsInstance(response, EmbeddingsResponse) + self.assertEqual(len(response.data), 1) + self.assertEqual(response.data[0]['contentnode_id'], public_content_node_id) + self.assertEqual(response.data[0]['rank'], 1.0) + + def test_response_does_not_exist(self): + new_request = EmbedTopicsRequest( + method='POST', + url='http://test.com', + path='/test/path', + params={'override_threshold': True}, + json={'topic': 'new_test_topic'} + ) + response = self.adapter.response_exists(new_request) + self.assertIsNone(response) + + def cache_request_test_helper(self, request_json, response_data, expected_count): + new_request = copy.deepcopy(self.request) + new_request.json = request_json + response_copy = copy.deepcopy(self.api_response) + response_copy.data = response_data + + result = self.adapter.cache_embeddings_request(new_request, response_copy) + self.assertTrue(result) + + cached_items = RecommendationsCache.objects.filter( + request_hash=self.adapter._generate_request_hash(new_request) + ) + print(list(cached_items.values('request_hash', 'contentnode', 'rank', 'override_threshold'))) + self.assertEqual(cached_items.count(), expected_count) + + def test_cache_embeddings_request_success(self): + self.cache_request_test_helper({'topic': 'new_test_topic_1'}, self.api_response.data, 2) + + def test_cache_embeddings_request_empty_data(self): + self.cache_request_test_helper({'topic': 'new_test_topic_2'}, [], 0) + + def test_cache_embeddings_request_ignore_duplicates(self): + duplicate_data = [ + {'contentnode_id': '1234567890abcdef1234567890abcdef', 'rank': 0.9}, + {'contentnode_id': '1234567890abcdef1234567890abcdef', 'rank': 0.9} + ] + self.cache_request_test_helper({'topic': 'new_test_topic_3'}, duplicate_data, 1) + + def test_cache_embeddings_request_invalid_data(self): + invalid_data = [ + {'contentnode_id': '1234567890abcdef1234567890abcdee', 'rank': 0.6} + ] + self.cache_request_test_helper({'topic': 'new_test_topic_4'}, invalid_data, 0) + + @patch('contentcuration.utils.recommendations.RecommendationsAdapter.cache_embeddings_request') + @patch('contentcuration.utils.recommendations.RecommendationsAdapter.generate_embeddings') + @patch('contentcuration.utils.recommendations.RecommendationsAdapter.response_exists') + @patch('contentcuration.utils.recommendations.EmbedTopicsRequest') + def test_get_recommendations_success(self, mock_embed_topics_request, mock_response_exists, + mock_generate_embeddings, mock_cache_embeddings_request): + channel = testdata.channel('Public Channel') + channel.public = True + channel.save() + + public_node_1 = PublicContentNode.objects.create( + id=uuid.UUID('00000000000000000000000000000003'), + title='Video 1', + content_id=uuid.uuid4(), + channel_id=uuid.UUID(channel.id), + ) + public_node_2 = PublicContentNode.objects.create( + id=uuid.UUID('00000000000000000000000000000005'), + title='Exercise 1', + content_id=uuid.uuid4(), + channel_id=uuid.UUID(channel.id), + ) + + mock_response_exists.return_value = None + mock_response = MagicMock(spec=EmbeddingsResponse) + mock_response.data = copy.deepcopy(self.api_response.data) + mock_response.error = None + mock_generate_embeddings.return_value = mock_response + + response = self.adapter.get_recommendations(self.topic) + results = list(response.results) + expected_node_ids = [public_node_1.id.hex, public_node_2.id.hex] + actual_node_ids = [result["node_id"] for result in results] - def test_embed_content_backend_connect_failure(self): - self.adapter.backend.connect.return_value = False - with self.assertRaises(errors.ConnectionError): - self.adapter.embed_content(self.resources) - self.adapter.backend.connect.assert_called_once() - self.adapter.backend.make_request.assert_not_called() + mock_response_exists.assert_called_once() + mock_generate_embeddings.assert_called_once() + self.assertIsInstance(response, RecommendationsResponse) + self.assertListEqual(expected_node_ids, actual_node_ids) + self.assertEqual(len(results), 2) + @patch('contentcuration.utils.recommendations.RecommendationsAdapter._extract_data') + @patch('contentcuration.utils.recommendations.RecommendationsAdapter.response_exists') + @patch('contentcuration.utils.recommendations.EmbedTopicsRequest') + def test_get_recommendations_failure(self, mock_embed_topics_request, mock_response_exists, + mock_extract_data): + mock_request_instance = MagicMock(spec=EmbedTopicsRequest) + mock_embed_topics_request.return_value = mock_request_instance + + self.assert_backend_call(mock_response_exists, None, False, None, + self.adapter.get_recommendations, self.topic) + + @patch('contentcuration.utils.recommendations.RecommendationsAdapter._extract_data') + @patch('contentcuration.utils.recommendations.RecommendationsAdapter.response_exists') @patch('contentcuration.utils.recommendations.EmbedContentRequest') - def test_embed_content_make_request_exception(self, embed_content_request_mock): - self.adapter.backend.connect.return_value = True - self.adapter.backend.make_request.side_effect = Exception("Mocked exception") - response = self.adapter.embed_content(self.resources) - self.adapter.backend.connect.assert_called_once() - self.adapter.backend.make_request.assert_called_once() - self.assertIsInstance(response, EmbeddingsResponse) - self.assertEqual(str(response.error), "Mocked exception") + def test_embed_content_success(self, mock_embed_topics_request, mock_response_exists, + mock_extract_data): + mock_response = MagicMock(spec=EmbeddingsResponse) + mock_response.error = None + response = self.assert_backend_call(mock_response_exists, None, True, mock_response, + self.adapter.embed_content, self.channel_id, + self.resources) + self.assertIsInstance(response, bool) + self.assertTrue(response) + + @patch('contentcuration.utils.recommendations.RecommendationsAdapter.response_exists') + @patch('contentcuration.utils.recommendations.EmbedContentRequest') + def test_embed_content_failure(self, mock_embed_topics_request, mock_response_exists): + response = self.assert_backend_call(mock_response_exists, None, False, + None, self.adapter.embed_content, + self.channel_id, + self.resources) + + self.assertIsNone(response) + + def extract_content_test_helper(self, mock_node, file_return_value, expected_result): + with patch('contentcuration.utils.recommendations.File.objects.filter', + return_value=file_return_value): + result = self.adapter.extract_content(mock_node) + self.assertEqual(result, expected_result) + + def test_extract_content(self): + mock_node = MagicMock(spec=ContentNode) + mock_node.node_id = '1234567890abcdef1234567890abcdef' + mock_node.title = 'Sample Title' + mock_node.description = 'Sample Description' + mock_node.language.lang_code = 'en' + mock_node.kind.kind = 'video' + + mock_file_instance = MagicMock() + mock_file_instance.file_on_disk = 'path/to/file.mp4' + mock_file_instance.preset_id = 'video_high_res' + mock_file_instance.language.lang_code = 'en' + + expected_result = { + "id": '1234567890abcdef1234567890abcdef', + "title": 'Sample Title', + "description": 'Sample Description', + "text": "", + "language": 'en', + "files": [ + { + 'url': 'path/to/file.mp4', + 'preset': 'video_high_res', + 'language': 'en', + } + ], + } + self.extract_content_test_helper(mock_node, [mock_file_instance], expected_result) + + def test_extract_content_no_files(self): + mock_node = MagicMock(spec=ContentNode) + mock_node.node_id = '1234567890abcdef1234567890abcdef' + mock_node.title = 'Sample Title' + mock_node.description = 'Sample Description' + mock_node.language.lang_code = 'en' + mock_node.kind.kind = 'video' + + expected_result = { + "id": '1234567890abcdef1234567890abcdef', + "title": 'Sample Title', + "description": 'Sample Description', + "text": "", + "language": 'en', + "files": [], + } + self.extract_content_test_helper(mock_node, [], expected_result) diff --git a/contentcuration/contentcuration/utils/automation_manager.py b/contentcuration/contentcuration/utils/automation_manager.py index c609064fe7..fbc76a0b5e 100644 --- a/contentcuration/contentcuration/utils/automation_manager.py +++ b/contentcuration/contentcuration/utils/automation_manager.py @@ -1,52 +1,40 @@ +from typing import Any +from typing import Dict +from typing import List +from typing import Union + +from kolibri_public.models import ContentNode as PublicContentNode + +from contentcuration.models import ContentNode as ContentNode from contentcuration.utils.recommendations import RecommendationsAdapter from contentcuration.utils.recommendations import RecommendationsBackendFactory class AutomationManager: def __init__(self): - self.recommendations_backend_factory = RecommendationsBackendFactory() - self.recommendations_backend_instance = self.recommendations_backend_factory.create_backend() - self.recommendations_backend_adapter = RecommendationsAdapter(self.recommendations_backend_instance) + self.factory = RecommendationsBackendFactory() + self.instance = self.factory.create_backend() + self.adapter = RecommendationsAdapter(self.instance) - def generate_embedding(self, text): - """ - Generate an embedding vector for the given text. - Args: - text (str): The text for which to generate an embedding. - Returns: - Vector: The generated embedding vector. + def generate_embeddings(self, channel_id: str, nodes: List[Union[ContentNode, PublicContentNode]]): """ - embedding_vector = self.recommendations_backend_adapter.generate_embedding(text=text) - return embedding_vector + Generates embeddings for the given list of nodes. This process is async. - def embedding_exists(self, embedding): - """ - Check if the given embedding vector exists. - Args: - embedding (Vector): The embedding vector to check. - Returns: - bool: True if the embedding exists, False otherwise. - """ - return self.recommendations_backend_adapter.embedding_exists(embedding=embedding) + :param channel_id: The channel id to which the nodes belong. + :param nodes: The list of nodes for which to generate embeddings. - def load_recommendations(self, embedding): + :return: A boolean indicating that the process has started. """ - Load recommendations based on the given embedding vector. - Args: - embedding (Vector): The embedding vector to use for recommendations. - Returns: - list: A list of recommended items. - """ - # Need to extract the recommendation list from the ResponseObject and change the return statement - self.recommendations_backend_adapter.get_recommendations(embedding=embedding) - return [] + return self.adapter.embed_content(channel_id, nodes) - def cache_embeddings(self, embeddings): + def load_recommendations(self, topic: Dict[str, Any], override_threshold=False): """ - Cache a list of embedding vectors. - Args: - embeddings (list): A list of embedding vectors to cache. - Returns: - bool: True if caching was successful, False otherwise. + Loads recommendations for the given topic. + + :param topic: A dictionary containing the topic for which to get recommendations. + :param override_threshold: A boolean flag to override the recommendation threshold. + + :return: A list of recommendations for the given topic. """ - return self.recommendations_backend_adapter.cache_embeddings(embeddings) + self.adapter.get_recommendations(topic=topic, override_threshold=override_threshold) + return [] diff --git a/contentcuration/contentcuration/utils/recommendations.py b/contentcuration/contentcuration/utils/recommendations.py index 1c015d2050..20c6cb05f2 100644 --- a/contentcuration/contentcuration/utils/recommendations.py +++ b/contentcuration/contentcuration/utils/recommendations.py @@ -1,56 +1,73 @@ +import hashlib +import json +import logging +import uuid from typing import Any from typing import Dict from typing import List from typing import Union +from automation.models import RecommendationsCache from automation.utils.appnexus import errors from automation.utils.appnexus.base import Adapter from automation.utils.appnexus.base import Backend from automation.utils.appnexus.base import BackendFactory from automation.utils.appnexus.base import BackendRequest from automation.utils.appnexus.base import BackendResponse +from django.db.models import Exists +from django.db.models import F +from django.db.models import OuterRef +from django.db.models import UUIDField +from django.db.models.functions import Cast +from django_cte import With +from kolibri_public.models import ContentNode as PublicContentNode +from le_utils.constants import content_kinds +from le_utils.constants import format_presets -from contentcuration.models import ContentNode +from contentcuration.models import Channel +from contentcuration.models import ContentNode as ContentNode +from contentcuration.models import File class RecommendationsBackendRequest(BackendRequest): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, **kwargs): + super().__init__(**kwargs) class RecommendationsRequest(RecommendationsBackendRequest): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, **kwargs): + super().__init__(**kwargs) class EmbeddingsRequest(RecommendationsBackendRequest): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, **kwargs): + super().__init__(**kwargs) class RecommendationsBackendResponse(BackendResponse): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, **kwargs): + super().__init__(**kwargs) class RecommendationsResponse(RecommendationsBackendResponse): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, results: List[Any], **kwargs): + super().__init__(**kwargs) + self.results = results -class EmbedTopicsRequest(RecommendationsBackendRequest): +class EmbedTopicsRequest(EmbeddingsRequest): path = '/embed-topics' method = 'POST' -class EmbedContentRequest(RecommendationsBackendRequest): +class EmbedContentRequest(EmbeddingsRequest): path = '/embed-content' method = 'POST' class EmbeddingsResponse(RecommendationsBackendResponse): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, **kwargs): + super().__init__(**kwargs) class RecommendationsBackendFactory(BackendFactory): @@ -61,67 +78,343 @@ def create_backend(self) -> Backend: class RecommendationsAdapter(Adapter): - def generate_embedding(self, text) -> EmbeddingsResponse: - request = EmbeddingsRequest() - return self.backend.make_request(request) + def generate_embeddings(self, request: EmbeddingsRequest) -> EmbeddingsResponse: + """ + Generates embeddings for the given request. - def embedding_exists(self, embedding) -> bool: - # Need to implement the logic to check if the embeddigns exist - # Return True if the embedding exists, or False otherwise - return True - - def cache_embeddings(self, embeddings_list) -> bool: - for embedding in embeddings_list: - try: - # Attempt to cache the embedding - # Write the caching logic - # A conrner case to look at here is if one of the embedding fails to get cached - # we need to handel it so that only the once that were not succesfull - # are attempted to cache again - pass - except Exception as e: - print(e) - return False - return True + :param request: The request for which to generate embeddings. + :return: The response containing the recommendations. + :rtype: EmbeddingsResponse + """ + if not self.backend.connect(): + raise errors.ConnectionError("Connection to the backend failed") - def get_recommendations(self, embedding) -> RecommendationsResponse: - request = RecommendationsRequest(embedding) - return self.backend.make_request(request) + try: + response = self.backend.make_request(request) + except Exception as e: + logging.exception(e) + response = EmbeddingsResponse(error=e) - def embed_topics(self, topics: Dict[str, Any]) -> EmbeddingsResponse: + return response - if not self.backend.connect(): - raise errors.ConnectionError("Connection to the backend failed") + def response_exists(self, request) -> Union[EmbeddingsResponse, None]: + """ + Checks if a cached response exists for the given request. + :param request: The request for which to check if a cached response exists. + :return: The cached response if it exists, otherwise None. + :rtype: Union[EmbeddingsResponse, None] + """ try: - embed_topics_request = EmbedTopicsRequest(json=topics) - return self.backend.make_request(embed_topics_request) + request_hash = self._generate_request_hash(request) + override_threshold = self._extract_override_threshold(request) + data = list(RecommendationsCache.objects + .filter(request_hash=request_hash, override_threshold=override_threshold) + .order_by('override_threshold', 'rank') + .values('contentnode_id', 'rank')) + if len(data) > 0: + return EmbeddingsResponse(data=data) + else: + return None except Exception as e: - return EmbeddingsResponse(error=e) + logging.exception(e) + return None + + def _generate_request_hash(self, request) -> str: + """ + Generates a unique hash for a given request. It serializes the request + attributes that make it unique, then generates a hash of this serialization. + + To prevent cache duplication, the hash is generated + independent of the override_threshold parameter. + + :param request: The request for which to generate a unique hash. + :return: A unique hash representing the request + """ - def embed_content(self, nodes: List[ContentNode]) -> EmbeddingsResponse: + params_copy = request.params.copy() if request.params else {} + params_copy.pop('override_threshold', None) + unique_attributes = json.dumps({ + 'params': params_copy, + 'json': request.json, + }, sort_keys=True).encode('utf-8') + + return hashlib.md5(unique_attributes).hexdigest() + + def cache_embeddings_request(self, request: BackendRequest, response: BackendResponse) -> bool: + """ + Caches the recommendations request and response. It performs a bulk insert of the + recommendations into the RecommendationsCache table, ignoring any conflicts. + + :param request: The request to cache. + :param response: The response to cache. + :return: A boolean indicating whether the caching was successful. + :rtype: bool + """ + + try: + nodes = self._extract_data(response) + valid_nodes = self._validate_nodes(nodes) + request_hash = self._generate_request_hash(request) + override_threshold = self._extract_override_threshold(request) + new_cache = [ + RecommendationsCache( + request_hash=request_hash, + contentnode_id=node['contentnode_id'], + rank=node['rank'], + override_threshold=override_threshold, + ) for node in valid_nodes + ] + RecommendationsCache.objects.bulk_create(new_cache, ignore_conflicts=True) + return True + except Exception as e: + logging.exception(e) + return False + + def _extract_override_threshold(self, request) -> bool: + """\ + Extracts the override_threshold parameter from the request safely. + + :param request: The request containing the parameters. + :return: The value of the override_threshold parameter, or False if not present. + :rtype: bool + """ + return request.params.get('override_threshold', False) if request.params else False + + def get_recommendations(self, topic: Dict[str, Any], + override_threshold=False) -> RecommendationsResponse: + """ + Get recommendations for the given topic. + + :param topic: A dictionary containing the topic for which to get recommendations. See + https://github.com/learningequality/le-utils/blob/main/spec/schema-embed_topics_request.json + :param override_threshold: A boolean flag to override the recommendation threshold. + :return: The recommendations for the given topic. :rtype: RecommendationsResponse + """ + + recommendations = [] + request = EmbedTopicsRequest( + params={'override_threshold': override_threshold}, + json=topic, + ) + + cached_response = self.response_exists(request) + if cached_response: + response = cached_response + else: + response = self.generate_embeddings(request=request) + if not response.error: + self.cache_embeddings_request(request, response) + + nodes = self._extract_data(response) + if len(nodes) > 0: + node_ids = self._extract_node_ids(nodes) + cast_node_ids = [uuid.UUID(node_id) for node_id in node_ids] + + channel_cte = With( + Channel.objects.annotate( + channel_id=self._cast_to_uuid(F('id')) + ).filter( + Exists( + PublicContentNode.objects.filter( + id__in=cast_node_ids, + channel_id=OuterRef('channel_id') + ) + ) + ).values( + 'main_tree_id', + tree_id=F('main_tree__tree_id'), + ).distinct() + ) + + recommendations = channel_cte.join( + ContentNode.objects.filter(node_id__in=node_ids), + tree_id=channel_cte.col.tree_id + ).with_cte(channel_cte).annotate( + main_tree_id=channel_cte.col.main_tree_id + ).values( + 'id', + 'node_id', + 'main_tree_id', + 'parent_id', + ) + + return RecommendationsResponse(results=recommendations) + + def _extract_data(self, response: BackendResponse) -> List[Dict[str, Any]]: + """ + Extracts the data from the given response. + + The response is of the form: + + { + "data": [ + { + "contentnode_id": "", + "rank": 0.7 + } + ] + } + + :param response: A response from which to extract the data. + :return: The extracted data. + :rtype: List[Dict[str, Any]] + """ + return response.data if response.data else [] + + def _validate_nodes(self, nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Validates the recommended nodes by checking if they exist in the database. + + :param nodes: The nodes to validate. + :return: A list of valid recommended nodes that exist in the database. + :rtype: List[Dict[str, Any]] + """ + node_ids = self._extract_node_ids(nodes) + existing_node_ids = set( + PublicContentNode.objects.filter(id__in=node_ids).values_list('id', flat=True)) + return [node for node in nodes if node['contentnode_id'] in existing_node_ids] + + def _extract_node_ids(self, nodes: List[Dict[str, Any]]) -> List[str]: + """ + Extracts the node IDs from the given nodes. + + :param nodes: The nodes from which to extract the node IDs. + :return: A list of node IDs. + :rtype: List[str] + """ + return [node['contentnode_id'] for node in nodes] + + def _cast_to_uuid(self, field): + """ + Casts the given field to a UUIDField. + + :param field: The field (such as F() object or OuterRef) to cast. + :return: The field cast to a UUIDField. + """ + return Cast(field, output_field=UUIDField()) + + def embed_content(self, channel_id: str, + nodes: List[Union[ContentNode, PublicContentNode]]) -> bool: + """ + Embeds the content for the given nodes. This is an asynchronous process and could take a + while to complete. This process is handled by our curriculum automation service. + See https://github.com/learningequality/curriculum-automation. Also, see + https://github.com/learningequality/le-utils/blob/main/spec/schema-embed_content_request.json + for the schema. + + :param channel_id: The channel ID to which the nodes belong. + :param nodes: The nodes for which to embed the content. + :return: A boolean indicating that content embedding process has started. + :rtype: bool + """ if not self.backend.connect(): raise errors.ConnectionError("Connection to the backend failed") + for i in range(0, len(nodes), 20): + try: + batch = nodes[i:i + 20] + content = [self.extract_content(node) for node in batch] + content_body = { + 'resources': content, + 'metadata': { + 'channel_id': channel_id, + } + } + request = EmbedContentRequest(json=content_body) + self.backend.make_request(request) + except Exception as e: + logging.exception(e) + + return True + + def extract_content(self, node) -> Dict[str, Any]: + """ + Extracts the content from the given node. + + :param node: The node from which to extract the content. + :return: A dictionary containing the extracted content. + :rtype: Dict[str, Any] + """ + contentkind_to_presets = { + content_kinds.AUDIO: [ + format_presets.AUDIO, + format_presets.AUDIO_DEPENDENCY, + ], + content_kinds.VIDEO: [ + format_presets.VIDEO_HIGH_RES, + format_presets.VIDEO_LOW_RES, + format_presets.VIDEO_SUBTITLE, + format_presets.VIDEO_DEPENDENCY, + ], + content_kinds.EXERCISE: [ + format_presets.EXERCISE, + format_presets.QTI_ZIP, + ], + content_kinds.DOCUMENT: [ + format_presets.DOCUMENT, + format_presets.EPUB, + ], + content_kinds.HTML5: [ + format_presets.HTML5_ZIP, + format_presets.AUDIO_DEPENDENCY, + format_presets.VIDEO_DEPENDENCY, + ], + content_kinds.H5P: [format_presets.H5P_ZIP], + content_kinds.ZIM: [format_presets.ZIM], + } + + contentkind = node.kind + presets = contentkind_to_presets.get(contentkind.kind) + files = self._get_content_files(node, presets) if presets else None + + return { + "id": node.node_id, + "title": node.title, + "description": node.description, + "text": "", + "language": node.language.lang_code if node.language else None, + "files": files, + } + + def _get_content_files(self, node, presets) -> List[Dict[str, Any]]: + """ + Get the content files for the given node and presets. + + :param node: The node for which to get the content files. + :param presets: The presets for which to get the content files. + :return: A list of dictionaries containing the content files. + :rtype: List[Dict[str, Any]] + """ + files = [] try: - resources = [self.extract_content(node) for node in nodes] - json = { - 'resources': resources, - 'metadata': {} - } - embed_content_request = EmbedContentRequest(json=json) - return self.backend.make_request(embed_content_request) + node_files = File.objects.filter(contentnode=node, preset__in=presets) + for file in node_files: + files.append(self._format_file_data(file)) except Exception as e: - return EmbeddingsResponse(error=e) + logging.exception(e) + + return files + + def _format_file_data(self, file) -> Dict[str, Any]: + """ + Format the file data into a dictionary. - def extract_content(self, node: ContentNode) -> Dict[str, Any]: - return {} + :param file: The file for which to format its data. + :return: A dictionary containing the formatted file data. + :rtype: Dict[str, Any] + """ + return { + 'url': file.file_on_disk, + 'preset': file.preset_id, + 'language': file.language.lang_code if file.language else None, + } class Recommendations(Backend): - def connect(self) -> None: + def connect(self) -> bool: return super().connect() def make_request(self, request) -> Union[EmbeddingsResponse, RecommendationsResponse]: