-
Notifications
You must be signed in to change notification settings - Fork 597
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
examples: change mavftp class to be a python3 library with a simple API.
The library can be called directly and it is a mavftp application the application contains documented argparse arguments Add a example that uses the library
- Loading branch information
1 parent
6827605
commit 6205678
Showing
2 changed files
with
1,135 additions
and
297 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
#!/usr/bin/env python3 | ||
|
||
''' | ||
MAVLink File Transfer Protocol support example | ||
SPDX-FileCopyrightText: 2024 Amilcar Lucas | ||
SPDX-License-Identifier: GPL-3.0-or-later | ||
''' | ||
|
||
from argparse import ArgumentParser | ||
|
||
from logging import basicConfig as logging_basicConfig | ||
from logging import getLevelName as logging_getLevelName | ||
|
||
from logging import debug as logging_debug | ||
from logging import info as logging_info | ||
from logging import error as logging_error | ||
|
||
import os | ||
import sys | ||
import requests | ||
import time | ||
|
||
|
||
from pymavlink import mavutil | ||
from pymavlink import mavftp | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
def argument_parser(): | ||
""" | ||
Parses command-line arguments for the script. | ||
""" | ||
parser = ArgumentParser(description='This main is just an example, adapt it to your needs') | ||
parser.add_argument("--baudrate", type=int, default=115200, | ||
help="master port baud rate. Defaults to %(default)s") | ||
parser.add_argument("--device", type=str, default='', | ||
help="serial device. For windows use COMx where x is the port number. " | ||
"For Unix use /dev/ttyUSBx where x is the port number. Defaults to autodetection") | ||
parser.add_argument("--source-system", dest='SOURCE_SYSTEM', type=int, default=250, | ||
help='MAVLink source system for this GCS. Defaults to %(default)s') | ||
parser.add_argument("--loglevel", default="INFO", | ||
help="log level. Defaults to %(default)s") | ||
|
||
return parser.parse_args() | ||
|
||
|
||
def auto_detect_serial(): | ||
preferred_ports = [ | ||
'*FTDI*', | ||
"*3D*", | ||
"*USB_to_UART*", | ||
'*Ardu*', | ||
'*PX4*', | ||
'*Hex_*', | ||
'*Holybro_*', | ||
'*mRo*', | ||
'*FMU*', | ||
'*Swift-Flyer*', | ||
'*Serial*', | ||
'*CubePilot*', | ||
'*Qiotek*', | ||
] | ||
serial_list = mavutil.auto_detect_serial(preferred_list=preferred_ports) | ||
serial_list.sort(key=lambda x: x.device) | ||
|
||
# remove OTG2 ports for dual CDC | ||
if len(serial_list) == 2 and serial_list[0].device.startswith("/dev/serial/by-id"): | ||
if serial_list[0].device[:-1] == serial_list[1].device[0:-1]: | ||
serial_list.pop(1) | ||
|
||
return serial_list | ||
|
||
|
||
def auto_connect(device): | ||
comport = None | ||
if device: | ||
comport = mavutil.SerialPort(device=device, description=device) | ||
else: | ||
autodetect_serial = auto_detect_serial() | ||
if autodetect_serial: | ||
# Resolve the soft link if it's a Linux system | ||
if os.name == 'posix': | ||
try: | ||
dev = autodetect_serial[0].device | ||
logging_debug("Auto-detected device %s", dev) | ||
# Get the directory part of the soft link | ||
softlink_dir = os.path.dirname(dev) | ||
# Resolve the soft link and join it with the directory part | ||
resolved_path = os.path.abspath(os.path.join(softlink_dir, os.readlink(dev))) | ||
autodetect_serial[0].device = resolved_path | ||
logging_debug("Resolved soft link %s to %s", dev, resolved_path) | ||
except OSError: | ||
pass # Not a soft link, proceed with the original device path | ||
comport = autodetect_serial[0] | ||
else: | ||
logging_error("No serial ports found. Please connect a flight controller and try again.") | ||
sys.exit(1) | ||
return comport | ||
|
||
|
||
def wait_heartbeat(m): | ||
'''wait for a heartbeat so we know the target system IDs''' | ||
logging_info("Waiting for flight controller heartbeat") | ||
m.wait_heartbeat() | ||
logging_info("Got heartbeat from system %u, component %u", m.target_system, m.target_system) | ||
|
||
|
||
def delete_local_file_if_exists(filename): | ||
if os.path.exists(filename): | ||
os.remove(filename) | ||
|
||
|
||
def get_list_dir(mav_ftp, directory): | ||
mav_ftp.cmd_list([directory]) | ||
|
||
|
||
def get_file(mav_ftp, remote_filename, local_filename, timeout=5): | ||
logging_info("Will download %s to %s", remote_filename, local_filename) | ||
mav_ftp.cmd_get([remote_filename, local_filename], timeout=timeout) | ||
|
||
|
||
def get_last_log(mav_ftp): | ||
try: | ||
with open('LASTLOG.TXT', 'r', encoding='UTF-8') as file: | ||
file_contents = file.readline() | ||
remote_filenumber = int(file_contents.strip()) | ||
except FileNotFoundError: | ||
logging_error("File LASTLOG.TXT not found.") | ||
return | ||
except ValueError: | ||
logging_error("Could not extract last log file number from LASTLOG.TXT contants %s", file_contents) | ||
return | ||
remote_filenumber = remote_filenumber - 1 # we do not want the very last log | ||
remote_filename = f'/APM/LOGS/{remote_filenumber:08}.BIN' | ||
get_file(mav_ftp, remote_filename, 'LASTLOG.BIN', 0) | ||
time.sleep(1) # wait for the file to be written to disk and for the FC to catch it's breath | ||
|
||
|
||
def download_script(url, local_filename): | ||
# Download the script from the internet to the PC | ||
response = requests.get(url, timeout=5) | ||
|
||
if response.status_code == 200: | ||
with open(local_filename, "wb") as file: | ||
file.write(response.content) | ||
else: | ||
logging_error("Failed to download the file") | ||
|
||
|
||
def create_directory(mav_ftp, remote_directory): | ||
mav_ftp.cmd_mkdir([remote_directory]) | ||
|
||
|
||
def upload_script(mav_ftp, remote_directory, local_filename, timeout): | ||
# Upload it from the PC to the flight controller | ||
mav_ftp.cmd_put([local_filename, remote_directory], timeout=timeout) | ||
time.sleep(0.3) # wait for the FC to catch it's breath | ||
|
||
|
||
def main(): | ||
'''for testing/example purposes only''' | ||
args = argument_parser() | ||
|
||
logging_basicConfig(level=logging_getLevelName(args.loglevel), format='%(asctime)s - %(levelname)s - %(message)s') | ||
|
||
# create a mavlink serial instance | ||
comport = auto_connect(args.device) | ||
master = mavutil.mavlink_connection(comport.device, baud=args.baudrate, source_system=args.SOURCE_SYSTEM) | ||
|
||
# wait for the heartbeat msg to find the system ID | ||
wait_heartbeat(master) | ||
|
||
mav_ftp = mavftp.MAVFTP(master, | ||
target_system=master.target_system, | ||
target_component=master.target_component) | ||
|
||
delete_local_file_if_exists("params.param") | ||
delete_local_file_if_exists("defaults.param") | ||
mav_ftp.cmd_getparams(["params.param", "defaults.param"]) | ||
|
||
get_list_dir(mav_ftp, '/APM/LOGS/') | ||
|
||
delete_local_file_if_exists("LASTLOG.TXT") | ||
delete_local_file_if_exists("LASTLOG.BIN") | ||
get_file(mav_ftp, '/APM/LOGS/LASTLOG.TXT', 'LASTLOG.TXT') | ||
|
||
get_last_log(mav_ftp) | ||
|
||
remote_directory = '/APM/Scripts/' | ||
create_directory(mav_ftp, remote_directory) | ||
|
||
url = "https://discuss.ardupilot.org/uploads/short-url/4pyrl7PcfqiMEaRItUhljuAqLSs.lua" | ||
local_filename = "copter-magfit-helper.lua" | ||
|
||
if not os.path.exists(local_filename): | ||
download_script(url, local_filename) | ||
|
||
upload_script(mav_ftp, remote_directory, local_filename, 5) | ||
|
||
url = "https://raw.githubusercontent.com/ArduPilot/ardupilot/Copter-4.5/libraries/AP_Scripting/applets/VTOL-quicktune.lua" | ||
local_filename = "VTOL-quicktune.lua" | ||
|
||
if not os.path.exists(local_filename): | ||
download_script(url, local_filename) | ||
|
||
upload_script(mav_ftp, remote_directory, local_filename, 5) | ||
|
||
master.close() | ||
|
||
|
||
main() |
Oops, something went wrong.