-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathabc.txt
144 lines (121 loc) · 4.53 KB
/
abc.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.stereotype.Service;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.*;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Collections;
import java.util.HashSet;
@Component
class MjpegFrameProcessor {
private static final int BUFFER_SIZE = 8192;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final Set<SseEmitter> emitters = Collections.synchronizedSet(new HashSet<>());
private final ExecutorService executorService;
private Future<?> processorFuture;
public MjpegFrameProcessor() {
// Use a single thread for frame processing to avoid overwhelming the system
this.executorService = Executors.newSingleThreadExecutor();
}
public void startProcessing(String streamUrl) {
processorFuture = executorService.submit(() -> {
while (isRunning.get()) {
try {
byte[] frame = captureFrame(streamUrl);
if (frame != null) {
broadcast(frame);
}
} catch (IOException e) {
// Log error and possibly implement retry logic
e.printStackTrace();
}
}
});
}
private byte[] captureFrame(String streamUrl) throws IOException {
URL url = new URL(streamUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
try (InputStream is = new BufferedInputStream(conn.getInputStream())) {
// Read MJPEG frame boundary
String boundary = findBoundary(is);
if (boundary == null) return null;
// Read frame content
return readFrameContent(is, boundary);
}
}
private String findBoundary(InputStream is) throws IOException {
// Implementation to find MJPEG boundary marker
// This is a simplified version - you'll need to implement proper boundary detection
return "--myboundary";
}
private byte[] readFrameContent(InputStream is, String boundary) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
baos.write(buffer, 0, bytesRead);
// Check if we've reached the boundary
if (new String(baos.toByteArray()).contains(boundary)) {
break;
}
}
return baos.toByteArray();
}
public SseEmitter addEmitter() {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
emitters.add(emitter);
emitter.onCompletion(() -> emitters.remove(emitter));
emitter.onTimeout(() -> emitters.remove(emitter));
emitter.onError(e -> emitters.remove(emitter));
return emitter;
}
private void broadcast(byte[] frame) {
Set<SseEmitter> deadEmitters = new HashSet<>();
emitters.forEach(emitter -> {
try {
emitter.send(frame, MediaType.IMAGE_JPEG);
} catch (IOException e) {
deadEmitters.add(emitter);
}
});
emitters.removeAll(deadEmitters);
}
@PreDestroy
public void shutdown() {
isRunning.set(false);
if (processorFuture != null) {
processorFuture.cancel(true);
}
executorService.shutdown();
}
}
@RestController
@RequestMapping("/api/stream")
class StreamController {
private final MjpegFrameProcessor frameProcessor;
public StreamController(MjpegFrameProcessor frameProcessor) {
this.frameProcessor = frameProcessor;
}
@GetMapping("/video")
public SseEmitter streamVideo() {
return frameProcessor.addEmitter();
}
}
@Service
class StreamService {
private final MjpegFrameProcessor frameProcessor;
public StreamService(MjpegFrameProcessor frameProcessor) {
this.frameProcessor = frameProcessor;
}
public void initializeStream(String streamUrl) {
frameProcessor.startProcessing(streamUrl);
}
}