Skip to content

Commit

Permalink
changes, mostly ondeck
Browse files Browse the repository at this point in the history
  • Loading branch information
efultz committed Oct 10, 2023
1 parent cc91eaa commit 5f88e68
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 16 deletions.
11 changes: 8 additions & 3 deletions archive_reencode.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def next_videos(session: Session):
l = list(results)
if len(l):
return l


def run_reencode(output_dir: Path, sessionmaker: SessionMaker):

Expand Down Expand Up @@ -123,11 +123,16 @@ def run_reencode(output_dir: Path, sessionmaker: SessionMaker):
@click.option('--dbuser', default=flaskconfig.get('DBUSER'))
@click.option('--output_dir', default=flaskconfig.get('VIDEO_OUTPUT_DIR'))
@click.option('--print_queue', is_flag=True)
def main(dbname, dbuser, output_dir, print_queue):
@click.option('--input_file', default=None)
def main(dbname, dbuser, output_dir, print_queue, input_file: str):

output_dir = Path(output_dir)


if input_file != None:
i = Path(input_file)
with i.open() as f:
for l in f.readlines():
vids.append(l.strip())

sa_engine = sa.create_engine("postgresql+psycopg2://%s@/%s"%(dbuser, dbname), echo=True)
sessionmaker = SessionMaker(sa_engine)
Expand Down
1 change: 1 addition & 0 deletions edge_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
admin.add_view(ModelView(FishAiData, db.session))
admin.add_view(ModelView(OndeckData, db.session))
admin.add_view(InternetDataView(db.session))
admin.add_view(ModelView(DeckhandEventRaw, db.session))
admin.add_view(ModelView(DeckhandEventView, db.session))

admin.add_view(ModelView(BoatSchedule, db.session))
Expand Down
25 changes: 25 additions & 0 deletions migrations/versions/f48359cf7456_ondeckdata_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""ondeckdata_status
Revision ID: f48359cf7456
Revises: 8304966281aa
Create Date: 2023-10-09 17:35:01.581320
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = 'f48359cf7456'
down_revision = '8304966281aa'
branch_labels = None
depends_on = None


def upgrade() -> None:

op.add_column('ondeckdata', sa.Column('status', sa.String(), nullable=True))


def downgrade() -> None:
op.drop_column('ondeckdata', 'status')
7 changes: 7 additions & 0 deletions model/ondeckdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@ class OndeckData(Base):
overallcount = Column(Integer)
overallruntimems = Column(REAL)
tracked_confidence = Column(REAL)
status = Column(String)

def __str__(self) -> str:
return 'OndeckData(' + ', '.join(
[n + '='+ str(self.__getattribute__(n)) for n in [
'id',
'video_uri',
'video_file',
'cocoannotations_uri',
'datetime',
'overallcount',
'overallruntimems',
'tracked_confidence',
'status',

]]) + ')'

154 changes: 141 additions & 13 deletions run_ondeck.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@

from datetime import datetime, timezone
import click
import json
import os
from pathlib import Path
import re
import requests
from requests import Response
import schedule
import subprocess
from subprocess import CompletedProcess
Expand Down Expand Up @@ -55,6 +58,7 @@ def next_videos(session: Session, thalos_cam_name):
})
return list(results)

MAGIC_VALUE_5_MiB = 5 * 1024 * 1024

def run_ondeck(output_dir: Path, engine: Path, sessionmaker: SessionMaker, thalos_cam_name):

Expand All @@ -63,21 +67,30 @@ def run_ondeck(output_dir: Path, engine: Path, sessionmaker: SessionMaker, thalo
with sessionmaker() as session:
video_files = next_videos(session, thalos_cam_name)

# print(video_files)
# click.echo(video_files)
while len(video_files) > 0:
video_file: VideoFile = video_files.pop(0)
# print(video_file)
# click.echo(video_file)
decrypted_path = Path(video_file.decrypted_path)
last_dot_index: int = decrypted_path.name.index('.')
if last_dot_index < 0:
last_dot_index = None
json_out_file: Path = output_dir / Path(decrypted_path.name[0:last_dot_index] + "_ondeck.json")

ondeck_input = str(decrypted_path.absolute())
try:
reencoded_path: Path = Path(video_file.reencoded_path)
if reencoded_path.stat().st_size > MAGIC_VALUE_5_MiB:
ondeck_input = str(reencoded_path.absolute())
except:
pass

# sudo /usr/bin/docker run --rm -v /videos:/videos --runtime=nvidia --network none gcr.io/edge-gcr/edge-service-image:latest --output /videos --input /videos/21-07-2023-09-55.avi
cmd: str = "sudo /usr/bin/docker run --rm -v /videos:/videos --runtime=nvidia --network none \
gcr.io/edge-gcr/edge-service-image:latest \
--output %s --input %s"%(
str(json_out_file.absolute()),
str(decrypted_path.absolute())
ondeck_input
)
if engine:
cmd += " --model %s"%( str(engine.absolute()), )
Expand All @@ -86,7 +99,7 @@ def run_ondeck(output_dir: Path, engine: Path, sessionmaker: SessionMaker, thalo

parse_json(sessionmaker, decrypted_path, json_out_file)
else:
# print("ondeck model failure. stdout, stderr:", p.stdout, p.stderr)
# click.echo("ondeck model failure. stdout, stderr: {} {}".format( p.stdout, p.stderr))
with sessionmaker() as session:
session.execute(sa.text("insert into ondeckdata ( video_uri, cocoannotations_uri ) \
values ( :decrypted_path, :error_str ) ;"), {
Expand Down Expand Up @@ -149,6 +162,101 @@ def parse_json(sessionmaker, decrypted_path, json_out_file):
session.commit()


def v2_enqueue(output_dir: Path, sessionmaker: SessionMaker, thalos_cam_name: str):

video_files: list[VideoFile] = []

with sessionmaker() as session:
video_files = next_videos(session, thalos_cam_name)

# print(video_files)
while len(video_files) > 0:
video_file: VideoFile = video_files.pop(0)
# print(video_file)
decrypted_path = Path(video_file.decrypted_path)
last_dot_index: int = decrypted_path.name.index('.')
if last_dot_index < 0:
last_dot_index = None
json_out_file: Path = output_dir / Path(decrypted_path.name[0:last_dot_index] + "_ondeck.json")

ondeck_input = str(decrypted_path.absolute())
try:
reencoded_path: Path = Path(video_file.reencoded_path)
if reencoded_path.stat().st_size > MAGIC_VALUE_5_MiB:
ondeck_input = str(reencoded_path.absolute())
except:
pass

try:
r: Response = requests.post('http://127.0.0.1:5000/inference', json={
"input_path":ondeck_input,
"output_path":str(json_out_file.absolute()),
"current_timestamp": video_file.start_datetime.astimezone(timezone.utc).replace(tzinfo=None).isoformat() + ".00Z"
})

click.echo("resp: {} body: {}".format(repr(r), repr(r.json())))

with sessionmaker() as session:
session.execute(sa.text("""insert into ondeckdata ( video_uri, cocoannotations_uri, status )
values ( :ondeck_input, :ondeck_output, :status )
on conflict DO UPDATE SET status = :status ;"""), {
"ondeck_input": ondeck_input,
"ondeck_output": str(json_out_file.absolute()),
"status": "queued"
}
)
session.commit()
except requests.exceptions.RequestException as e:
click.echo("ondeck model request exception: {}".format(e))
return

MAGIC_VALUE_1_MINUTE = 60

def v2_parse(output_dir: Path, sessionmaker: SessionMaker):
# only pick files that end with _ondeck.json
a = filter(lambda x: x.is_file() and x.name.endswith('_ondeck.json'), output_dir.iterdir())

epoch_now = int(time.time())
# only pick files that haven't been modified in the last minute
b = filter(lambda x: x.stat().st_mtime + MAGIC_VALUE_1_MINUTE < epoch_now, a)

# get the filenames
c = map(lambda x: str(x.absolute()) , b)

found_ondeck_files = list(c)

with sessionmaker() as session:
results: Query[OndeckData] = session.query(OndeckData).where(OndeckData.status == 'queued')
for pending_ondeckdata in results:
if pending_ondeckdata.video_uri in found_ondeck_files:
pending_ondeckdata.status = "parsing"
session.commit()



def v2_errors(sessionmaker: SessionMaker):
try:
r: Response = requests.get('http://127.0.0.1:5000/errors')

click.echo("errors resp: {} body: {}".format(repr(r), repr(r.json())))

for error in r:
input_path = r.get('input_path')
error_message = r.get('error_message')

with sessionmaker() as session:
session.execute(sa.text("insert into ondeckdata ( video_uri, cocoannotations_uri ) \
values ( :decrypted_path, :error_str ) ;"), {
"decrypted_path": input_path,
"error_str": "ondeck model failure. stdout, stderr: " + error_message
}
)
session.commit()

except requests.exceptions.RequestException as e:
click.echo("ondeck model errors request exception: {}".format(e))
return

@click.command()
@click.option('--dbname', default=flaskconfig.get('DBNAME'))
@click.option('--dbuser', default=flaskconfig.get('DBUSER'))
Expand All @@ -158,7 +266,8 @@ def parse_json(sessionmaker, decrypted_path, json_out_file):
@click.option('--print_queue', is_flag=True)
@click.option('--parsetesta')
@click.option('--parsetestb')
def main(dbname, dbuser, output_dir, engine, thalos_cam_name, print_queue, parsetesta, parsetestb):
@click.option('--force_v2', is_flag=True)
def main(dbname, dbuser, output_dir, engine, thalos_cam_name, print_queue, parsetesta, parsetestb, force_v2: bool):

output_dir = Path(output_dir)

Expand All @@ -179,16 +288,35 @@ def main(dbname, dbuser, output_dir, engine, thalos_cam_name, print_queue, parse
with sessionmaker() as session:
video_files = next_videos(session, thalos_cam_name)
for v in video_files:
print(v.decrypted_path)
click.echo(v.decrypted_path)
return

def runonce(output_dir, engine, sessionmaker, thalos_cam_name):
run_ondeck(output_dir, engine, sessionmaker, thalos_cam_name)
return schedule.CancelJob

schedule.every(1).seconds.do(runonce, output_dir, engine, sessionmaker, thalos_cam_name)
use_v2 = False
try:
r: Response = requests.get('http://127.0.0.1:5000/queueSummary')
use_v2 = r.status_code == 200
click.echo("resp: {} body: {}".format(repr(r), repr(r.json())))
except requests.exceptions.RequestException as e:
click.echo("ondeck model request exception: {}".format(e))

if force_v2 or use_v2:

def runonce(output_dir, sessionmaker, thalos_cam_name):
v2_enqueue(output_dir, sessionmaker, thalos_cam_name)
return schedule.CancelJob

schedule.every(1).seconds.do(runonce, output_dir, sessionmaker, thalos_cam_name)

schedule.every(5).minutes.do(v2_enqueue, output_dir, sessionmaker, thalos_cam_name )
else:

def runonce(output_dir, engine, sessionmaker, thalos_cam_name):
run_ondeck(output_dir, engine, sessionmaker, thalos_cam_name)
return schedule.CancelJob

schedule.every(1).seconds.do(runonce, output_dir, engine, sessionmaker, thalos_cam_name)

schedule.every(5).minutes.do(run_ondeck, output_dir, engine, sessionmaker, thalos_cam_name )
schedule.every(5).minutes.do(run_ondeck, output_dir, engine, sessionmaker, thalos_cam_name )

while 1:
n = schedule.idle_seconds()
Expand All @@ -197,7 +325,7 @@ def runonce(output_dir, engine, sessionmaker, thalos_cam_name):
break
elif n > 0:
# sleep exactly the right amount of time
print("sleeping for:", n)
click.echo("sleeping for: {}".format(n))
time.sleep(n)
schedule.run_pending()

Expand Down
44 changes: 44 additions & 0 deletions scripts/system-install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ while (( "$#" )); do
--do-copy-numpy)
DO_COPY_PY_PANDAS_TO_VENV="y"
;;
--do-ondeck)
DO_ONDECK="y"
;;
*)
help
;;
Expand Down Expand Up @@ -726,3 +729,44 @@ elif ! sudo diff $TMP_FILE /etc/systemd/system/reencode_video_tnc.service >/dev/
fi
rm $TMP_FILE



if [ $DO_ONDECK ] ; then

TMP_FILE="$(mktemp)"
cat > $TMP_FILE << EOF
[Unit]
Description=Ondeck Model Container
After=docker.service
Requires=docker.service
StartLimitIntervalSec=0
[Service]
TimeoutStartSec=0
Restart=always
RestartSec=120
ExecStartPre=-/usr/bin/docker stop ondeck_model
ExecStartPre=-/usr/bin/docker rm ondeck_model
ExecStart=/usr/bin/docker run --rm --name ondeck_model -p 5000:5000 --runtime nvidia -v /videos:/videos -e APP_CONTAINER_DIR=/videos edge-service-image:latest
ExecStartPost=-/usr/bin/docker exec -it ondeck_model sed -i'' -e 's# format_str = "%Y-%m-%dT%H:%M:%S.%f%z"# format_str = "%Y-%m-%dT%H:%M:%S.%f%Z"#' /app/app/workers/worker.py
[Install]
WantedBy=default.target
EOF

if ! [ -e "/etc/systemd/system/ondeck_model.service" ] ; then
sudo cp $TMP_FILE /etc/systemd/system/ondeck_model.service

sudo systemctl daemon-reload
sudo systemctl enable "ondeck_model.service"
sudo systemctl start "ondeck_model.service"

elif ! sudo diff $TMP_FILE /etc/systemd/system/ondeck_model.service >/dev/null; then
sudo cp $TMP_FILE /etc/systemd/system/ondeck_model.service

sudo systemctl daemon-reload
sudo systemctl restart "ondeck_model.service"
fi
rm $TMP_FILE
fi

0 comments on commit 5f88e68

Please sign in to comment.