generated from MarshalX/bluesky-feed-generator
-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
algo_matcher.py
89 lines (79 loc) · 3.33 KB
/
algo_matcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
import logging
import json
import redis
from rq import Queue, Worker, get_current_job
from atproto import models
from server.logger import logger
from server.algos.manager import AlgoManager
from server.database import SessionLocal, UserAlgorithm, Post, User
from server.algos.shared_model_store import SharedModelStore
# Configure Redis connection
redis_url = os.getenv("REDIS_URL", "redis://redis:6379/0")
redis_conn = redis.from_url(redis_url)
# Initialize RQ queue
queue = Queue("algo_matcher", connection=redis_conn)
# Load the AlgoManager, assuming it takes in a user-specific manifest
def algo_matches(user, user_algorithm, record, create_info):
author_clause = user_algorithm.algo_manifest.get("author", {}) or {}
if not author_clause and user:
author_clause = {"username": user.username, "password": user.app_password, "session_string": user.session_string}
user_algorithm.algo_manifest["author"] = author_clause
algo_manager = AlgoManager(user_algorithm.algo_manifest)
try:
return algo_manager.record_matches_algo(record, create_info)
except:
return False
def match_algo(record_data):
"""
Process a JSON object record through the algorithms.
Args:
record_data (dict): JSON object to process.
"""
any_matches = False
match_ids = []
logger.info(record_data)
# Use a context manager for database session to ensure proper closing
with SessionLocal() as db:
for user_algorithm in db.query(UserAlgorithm).all():
user = db.query(User).filter(User.user_id == user_algorithm.user_id).first()
record = models.get_or_create(record_data["record"], strict=False)
if algo_matches(user, user_algorithm, record, record_data["create_info"]):
any_matches = True
match_ids.append(user_algorithm.id)
if any_matches:
post_dict = {
'uri': record_data['create_info']['uri'],
'cid': record_data['create_info']['cid'],
'author': record_data['create_info']['author'],
'reply_parent': record.reply.parent.uri if record.reply else None,
'reply_root': record.reply.root.uri if record.reply else None,
}
logger.info(f"Post dict is {post_dict}, matches are {match_ids}")
post = db.query(Post).filter_by(uri=post_dict['uri']).first()
if not post:
post = Post(**post_dict)
db.add(post)
db.commit()
db.refresh(post)
# Associate the post with each matching UserAlgorithm
for algo_id in match_ids:
user_algorithm = db.query(UserAlgorithm).get(algo_id)
if user_algorithm:
post.algorithms.append(user_algorithm)
# Commit all associations
db.commit()
logger.info(f"Post created with URI {post.uri}, associated with algorithms {match_ids}")
return {"matches": match_ids}
def delete_record(record_data):
"""
Delete a record from the feeds
Args:
record_data (dict): JSON object to process.
"""
# Run the record through algo_manager
return True
if __name__ == "__main__":
with redis_conn:
worker = Worker(["algo_matcher"], connection=redis_conn)
worker.work()