forked from win4r/VideoFinder-Llama3.2-vision-Ollama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
165 lines (133 loc) · 5.41 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# main.py
from fastapi import FastAPI, UploadFile, File, Form, Request
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse, StreamingResponse
import shutil
import os
import cv2
import ollama
import time
from pathlib import Path
import asyncio
import json
app = FastAPI()
# 创建必要的目录
UPLOAD_DIR = Path("uploads")
FRAMES_DIR = Path("frames")
UPLOAD_DIR.mkdir(exist_ok=True)
FRAMES_DIR.mkdir(exist_ok=True)
# 设置模板和上传/帧目录的静态文件服务
templates = Jinja2Templates(directory="templates")
app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads")
app.mount("/frames", StaticFiles(directory="frames"), name="frames")
async def analyze_image(image_path: str, object_str: str):
"""异步版本的图像分析函数"""
prompt_str = f"""Please analyze the image and answer the following questions:
1. Is there a {object_str} in the image?
2. If yes, describe its appearance and location in the image in detail.
3. If no, describe what you see in the image instead.
4. On a scale of 1-10, how confident are you in your answer?
Please structure your response as follows:
Answer: [YES/NO]
Description: [Your detailed description]
Confidence: [1-10]"""
try:
response = await asyncio.to_thread(
ollama.chat,
model='llama3.2-vision',
messages=[{
'role': 'user',
'content': prompt_str,
'images': [image_path]
}]
)
response_text = response['message']['content']
response_lines = response_text.strip().split('\n')
answer = None
description = None
confidence = 10
for line in response_lines:
line = line.strip()
if line.lower().startswith('answer:'):
answer = line.split(':', 1)[1].strip().upper()
elif any(line.lower().startswith(prefix) for prefix in
['description:', 'reasoning:', 'alternative description:']):
description = line.split(':', 1)[1].strip()
elif line.lower().startswith('confidence:'):
try:
confidence = int(line.split(':', 1)[1].strip())
except ValueError:
confidence = 10
return answer == "YES" and confidence >= 7, description, confidence
except Exception as e:
print(f"Error during image analysis: {str(e)}")
return False, "Error occurred", 0
def preprocess_image(image_path):
"""图像预处理函数"""
img = cv2.imread(image_path)
if img is None:
return False
lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
cl = clahe.apply(l)
limg = cv2.merge((cl, a, b))
final = cv2.cvtColor(limg, cv2.COLOR_LAB2BGR)
cv2.imwrite(image_path, final, [cv2.IMWRITE_JPEG_QUALITY, 100])
return True
@app.get("/")
async def home(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/analyze")
async def analyze_video(
video: UploadFile = File(...),
object_str: str = Form(...)
):
try:
# 保存上传的视频
video_path = UPLOAD_DIR / video.filename
with open(video_path, "wb") as buffer:
shutil.copyfileobj(video.file, buffer)
# 为当前任务创建专门的帧目录
task_frames_dir = FRAMES_DIR / video.filename.split('.')[0]
task_frames_dir.mkdir(exist_ok=True)
# 异步生成分析结果
async def generate_results():
cap = cv2.VideoCapture(str(video_path))
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_count = 0
try:
while True:
success, frame = cap.read()
if not success:
break
if frame_count % fps == 0: # 每秒处理一帧
current_second = frame_count // fps
frame_path = os.path.join(task_frames_dir, f"frame_{current_second}.jpg")
cv2.imwrite(frame_path, frame)
if preprocess_image(frame_path):
is_match, description, confidence = await analyze_image(frame_path, object_str)
result = {
"status": "success",
"frame": {
"second": current_second,
"is_match": is_match,
"description": description,
"confidence": confidence,
"frame_path": f"/frames/{video.filename.split('.')[0]}/frame_{current_second}.jpg"
}
}
yield json.dumps(result) + "\n"
frame_count += 1
finally:
cap.release()
return StreamingResponse(generate_results(), media_type="application/json")
except Exception as e:
return JSONResponse(
status_code=500,
content={"status": "error", "message": str(e)}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)