-
Notifications
You must be signed in to change notification settings - Fork 4
/
obs_websocket.py
150 lines (133 loc) · 4.77 KB
/
obs_websocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import json
from os import path
import time
import obsws_python as obs
from ls_logging import logger
from queue import Queue
from PyQt6.QtCore import QThread
from storage import fetch_data
def open_obs_websocket(server_info):
# Open a websocket connection to OBS
try:
cl = obs.ReqClient(
host=server_info["ip"],
port=server_info["port"],
password=server_info["password"],
timeout=10,
)
resp = cl.get_version()
logger.info(f"OBS Version: {resp.obs_version}")
return cl
except Exception as e:
logger.warn(f"Error: {e}")
return None
def open_obs_websocket_from_settings():
# Open a websocket connection to OBS using settings
settings = fetch_data("settings.json", "settings", {})
obs_host = settings.get("obs_host", "localhost")
obs_port = settings.get("obs_port", "4455")
obs_password = settings.get("obs_password", "")
return open_obs_websocket(
{"ip": obs_host, "port": obs_port, "password": obs_password}
)
def disconnect_obs_websocket(obs_client: obs.ReqClient):
# Disconnect the OBS websocket
try:
obs_client.base_client.ws.close()
except Exception as e:
logger.warn(f"Error: {e}")
def get_all_sources(obs_client: obs.ReqClient):
# Get all the sources from OBS
try:
# get all scenes
resp = obs_client.get_scene_list()
scenes = resp.scenes
# get all sources from all scenes
sources = []
for scene in scenes:
resp = obs_client.get_scene_item_list(scene["sceneName"])
# add the sources with their scene name
for source in resp.scene_items:
source["sceneName"] = scene["sceneName"]
sources.append(source)
return sources
except Exception as e:
logger.exception("Error: unable to get all sources")
return None
def get_all_text_sources(obs_client: obs.ReqClient):
# Get all the text sources from OBS
sources = get_all_sources(obs_client)
if sources is None:
return None
text_sources = []
for source in sources:
if str(source["inputKind"]).startswith("text_"):
source_settings = obs_client.get_input_settings(
source["sourceName"]
).input_settings
# check if source has text
if "text" in source_settings:
text_sources.append(source)
return text_sources
def get_source_by_name(obs_client: obs.ReqClient, source_name):
# Get a source from OBS by name
try:
# get all scenes
resp = obs_client.get_scene_list()
scenes = resp.scenes
# get all sources from all scenes
sources = []
for scene in scenes:
resp = obs_client.get_scene_item_list(scene["sceneName"])
# add the sources with their scene name
for source in resp.scene_items:
source["sceneName"] = scene["sceneName"]
sources.append(source)
# find the source by name
for source in sources:
if source["sourceName"] == source_name:
return source
return None
except Exception as e:
logger.exception("Error: unable to get source by name")
return None
class OBSPoller(QThread):
def __init__(
self,
obs_client: obs.ReqClient,
obs_source_name: str,
queue: Queue,
polling_freq=1000,
):
super().__init__()
self.obs_client = obs_client
self.obs_source_name = obs_source_name
self.queue = queue
self.polling_freq = polling_freq
self.running = False
self.last_content = None
def stop(self):
self.running = False
def run(self):
logger.info("OBS polling thread started")
self.running = True
while self.running:
try:
# get the value of the source
source = get_source_by_name(self.obs_client, self.obs_source_name)
if source is None:
logger.error(f"Source {self.obs_source_name} not found")
break
source_settings = self.obs_client.get_input_settings(
source["sourceName"]
).input_settings
source_content = (
source_settings["text"] if "text" in source_settings else None
)
if source_content and source_content != self.last_content:
self.queue.put_nowait(source_content)
self.last_content = source_content
except Exception as e:
logger.exception(f"Error: {e}")
time.sleep(self.polling_freq / 1000)
logger.info("OBS polling thread stopped")