From be770f8f816a657c3816af89c784b848fa4d6a43 Mon Sep 17 00:00:00 2001 From: Fran Quinto <1702904+fquinto@users.noreply.github.com> Date: Wed, 8 Nov 2023 19:01:39 +0100 Subject: [PATCH] not finished, only for save on Github --- ha_config/ha_api.py | 543 +++++++++++++++++ ha_config/ha_config.ini | 13 + ha_config/ha_control.py | 1230 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 1786 insertions(+) create mode 100644 ha_config/ha_api.py create mode 100644 ha_config/ha_config.ini create mode 100644 ha_config/ha_control.py diff --git a/ha_config/ha_api.py b/ha_config/ha_api.py new file mode 100644 index 0000000..1212e95 --- /dev/null +++ b/ha_config/ha_api.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Author Version Date Comments +# FQuinto 0.0.1 2023-09-28 First version + +"""Home Assistant API.""" + + +__version__ = '0.0.1' + +import time +import socket +import logging +import os +import json +import configparser +import base64 +from logging.handlers import RotatingFileHandler +# Better is using lxml but difficult to install +from xml.etree import ElementTree +# from lxml import html, etree +from flask import Flask, jsonify, send_from_directory + + +class HomeAssistantAPI: + """Home Assistant API for Bticino.""" + def __init__(self): + self.app = Flask(__name__) + self.setuplogging() + # Define routes + self.app.add_url_rule('/', 'main_menu', self.main_menu) + self.app.add_url_rule('/load', 'load', self.load) + self.app.add_url_rule('/unlock', 'unlock', self.unlock) + self.app.add_url_rule('/reboot', 'reboot', self.reboot) + self.app.add_url_rule('/fwupgrade', 'fwupgrade', + self.read_xml_fwupgrade) + self.app.add_url_rule('/fwversion', 'fwversion', self.fwversion) + self.app.add_url_rule('/leds', 'leds', self.leds) + self.app.add_url_rule('/messages', 'messages', + self.get_message_numbers) + self.app.add_url_rule('/messages_html', 'messages_html', + self.get_messages_html) + self.app.add_url_rule('/messages/', 'get_message', + self.get_message) + self.app.add_url_rule('/messages//video', + 'get_videomessage', + self.get_videomessage) + self.app.add_url_rule('/messages//image', + 'get_imagemessage', + self.get_imagemessage) + # conf.xml file or stack_open.xml file + self.app.add_url_rule('/conf/', 'get_conf', + self.get_conf) + self.app.add_url_rule('/conf//download', + 'get_conf_download', + self.get_conf_download) + + def setuplogging(self): + """Setup logging.""" + self.loggingLEVEL = None + self.readINIfile() + # Setup LOGGING + switcher = { + 'error': logging.ERROR, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'critical': logging.CRITICAL, + 'debug': logging.DEBUG + } + LOGGER_LEVEL = switcher.get(self.loggingLEVEL) + f = ('%(asctime)s - %(name)s - [%(levelname)s] ' + + '- %(funcName)s - %(message)s') + logging.basicConfig(level=LOGGER_LEVEL, format=f) + self.logger = logging.getLogger(__name__) + self.logger.setLevel(LOGGER_LEVEL) + + # Añadiendo logging rotativo + logpath = '/var/log/ha_api.log' + handler = RotatingFileHandler(logpath, maxBytes=10485760, + backupCount=3) + formatter = logging.Formatter(f) + # Añadiendo el formato al handler + handler.setFormatter(formatter) + # Añadiendo el handler al logger + self.logger.addHandler(handler) + message = "Log saving in: " + logpath + self.logger.info(message + ' version: ' + __version__) + + def readINIfile(self): + """Read INI file.""" + config = configparser.ConfigParser() + script_dir = os.path.dirname(__file__) + rel_path = "./ha_config.ini" + abs_file_path = os.path.join(script_dir, rel_path) + config.read(abs_file_path) + self.loggingLEVEL = config['DEFAULT']['loggingLEVEL'] + self.localfolder = config['DEFAULT']['localfolder'] + + def leds(self): + """Read leds status.""" + + # Define the directory path + leds_dir = '/sys/class/leds' + + # Initialize a dictionary to store the LED data + led_data = {} + + # List all LED directories in the directory + led_dirs = [d for d in os.listdir(leds_dir) if d.startswith('led_')] + + # Iterate through the LED directories + for led_directory in led_dirs: + brightness_f = os.path.join(leds_dir, led_directory, 'brightness') + try: + with open(brightness_f, 'r') as file: + brightness_value = file.read().strip() + led_data[led_directory] = brightness_value + except Exception as e: + self.logger.error("An error occurred: " + str(e)) + + # Convert the dictionary to JSON + led_data_json = json.dumps(led_data, indent=4) + return led_data_json + + def get_conf(self, conf_file, download=False): + """Get conf.xml file (show).""" + folder = '/var/tmp/' + conf_files = ['stack_open.xml', 'conf.xml'] + if conf_file not in conf_files: + self.logger.error("File not found: stack_open.xml or conf.xml " + "but " + conf_file + " found") + return jsonify({'error': ( + 'File not found: ' + 'stack_open.xml or conf.xml')}), 404 + conf_file_dir = os.path.join(folder, conf_file) + if not os.path.exists(conf_file_dir): + return jsonify({'error': 'File not found'}), 404 + if download: + return send_from_directory(folder, conf_file, as_attachment=True) + else: + return self.read_file_content(conf_file_dir) + + def get_conf_download(self, conf_file): + """Get conf.xml file (download).""" + return self.get_conf(conf_file, download=True) + + def get_message_numbers(self): + """Read video messages.""" + m_dir = '/home/bticino/cfg/extra/47/messages' + file_list = os.listdir(m_dir) + # Sort the file list by modification timestamp (most recent first) + file_list.sort(key=lambda x: os.path.getmtime(os.path.join(m_dir, x)), + reverse=True) + m_numbers = [d for d in file_list if d.startswith('message_')] + # add url image for every message + for i in range(len(m_numbers)): + num_message = m_numbers[i].split('_')[1] + unixtime_message = self.get_message_info_param( + num_message, 'unixtime') + dt_unixtime = time.strftime( + '%Y-%m-%d %H:%M:%S', time.localtime(int(unixtime_message))) + m_folder = os.path.join(m_dir, m_numbers[i]) + image_file = os.path.join(m_folder, 'aswm.jpg') + if os.path.exists(image_file): + m_numbers[i] = { + 'number': str(i), + 'image': '/messages/' + num_message + '/image', + 'detail': '/messages/' + num_message, + 'date': dt_unixtime, + 'unixtime': unixtime_message + } + else: + m_numbers[i] = { + 'number': str(i), + 'image': '', + 'detail': '/messages/' + num_message, + 'date': dt_unixtime, + 'unixtime': unixtime_message + } + video_file = os.path.join(m_folder, 'aswm.avi') + if os.path.exists(video_file): + m_numbers[i]['video'] = ( + '/messages/' + num_message + '/video') + else: + m_numbers[i]['video'] = '' + # order m_numbers by unixtime + m_numbers = self.sort_messages(m_numbers) + return jsonify(m_numbers) + + def get_messages_html(self): + """Get messages in HTML format.""" + m_dir = '/home/bticino/cfg/extra/47/messages' + file_list = os.listdir(m_dir) + # Sort the file list by modification timestamp (most recent first) + # Sort the file list by modification timestamp (most recent first) + file_list.sort(key=lambda x: os.path.getmtime(os.path.join(m_dir, x)), + reverse=True) + m_numbers = [d for d in file_list if d.startswith('message_')] + # add url image for every message + for i in range(len(m_numbers)): + num_message = m_numbers[i].split('_')[1] + r_message = self.get_message_info_param(num_message, 'read') + # r_message = 0 -> unread + # r_message = 1 -> read + if r_message == '0': + read_status = 'Unread' + else: + read_status = 'Read' + unixtime_message = self.get_message_info_param( + num_message, 'unixtime') + dt_unixtime = time.strftime( + '%Y-%m-%d %H:%M:%S', time.localtime(int(unixtime_message))) + m_folder = os.path.join(m_dir, m_numbers[i]) + image_file = os.path.join(m_folder, 'aswm.jpg') + if os.path.exists(image_file): + with open(image_file, "rb") as ifile: + e_string = base64.b64encode( + ifile.read()).decode('utf-8') + m_numbers[i] = { + 'number': num_message, + 'image': '/messages/' + num_message + '/image', + 'detail': '/messages/' + num_message, + 'base64': e_string, + 'date': dt_unixtime, + 'unixtime': unixtime_message, + 'read_status': read_status + } + else: + m_numbers[i] = { + 'number': num_message, + 'image': '', + 'detail': '/messages/' + num_message, + 'base64': '', + 'date': dt_unixtime, + 'unixtime': unixtime_message, + 'read_status': read_status + } + video_file = os.path.join(m_folder, 'aswm.avi') + if os.path.exists(video_file): + m_numbers[i]['video'] = ( + '/messages/' + num_message + '/video') + else: + m_numbers[i]['video'] = '' + # Prepare response in HTML format + response_html = ( + "" + "Messages" + "

Messages

" + "
    ") + # order m_numbers by unixtime + m_numbers = self.sort_messages(m_numbers) + for m in m_numbers: + response_html += ( + "
  • Message " + m['number'] + " - " + + m['read_status'] + " - " + "Detail") + if m['image'] != '': + response_html += ( + " - " + "" + "") + else: + response_html += ' - No image' + if m['video'] != '': + response_html += ( + " - Video
  • ") + else: + response_html += ' - No video' + response_html += ( + "
" + "") + return response_html + + def sort_messages(self, m_numbers): + """Sort messages.""" + # sort using unixtime from date + m_numbers.sort(key=lambda x: x['unixtime'], reverse=True) + return m_numbers + + def get_message_info(self, m_num): + """Read info message.""" + message_info = {} + e_string = '' + m_dir = '/home/bticino/cfg/extra/47/messages' + m_folder = os.path.join(m_dir, 'message_' + str(m_num)) + if not os.path.exists(m_folder): + return (message_info, e_string) + info_file = os.path.join(m_folder, 'msg_info.ini') + if os.path.exists(info_file): + config = configparser.ConfigParser() + config.read(info_file) + if 'Message Information' in config: + message_info = dict(config['Message Information']) + # Assuming the image file is named aswm.jpg + image_file = os.path.join(m_folder, 'aswm.jpg') + if not os.path.exists(image_file): + return (message_info, e_string) + # convert image file to data base64 + with open(image_file, "rb") as ifile: + e_string = base64.b64encode(ifile.read()).decode('utf-8') + # check video file + video_file = os.path.join(m_folder, 'aswm.avi') + if not os.path.exists(video_file): + message_info['video'] = '' + else: + message_info['video'] = '/messages/' + str(m_num) + '/video' + return (message_info, e_string) + + def get_message_info_param(self, m_num, param): + """Get message info param.""" + (message_info, e_string) = self.get_message_info(m_num) + aux = 'N/A' + if param in message_info: + aux = message_info.get(param) + return aux + + def get_message(self, m_num): + """View video message html.""" + (message_info, e_string) = self.get_message_info(m_num) + # Prepare response in HTML format + # video_file_api = '/messages/' + str(m_num) + '/video' + video_file_api = message_info.get('video', '') + response_html = ( + "" + "Message " + str(m_num) + "" + "

Message " + str(m_num) + "

") + if e_string != '': + response_html += ( + "") + else: + response_html += 'No image' + response_html += ( + "

Message Information

" + "
    " + "
  • Date: " + message_info.get('date', 'N/A') + "
  • " + "
  • MediaType: " + message_info.get('mediatype', 'N/A') + "
  • " + "
  • EuAddr: " + message_info.get('euaddr', 'N/A') + "
  • " + "
  • Cause: " + message_info.get('cause', 'N/A') + "
  • " + "
  • Status: " + message_info.get('status', 'N/A') + "
  • " + "
  • UnixTime: " + message_info.get('unixtime', 'N/A') + "
  • " + "
  • Read: " + message_info.get('read', 'N/A') + "
  • " + "
  • Duration: " + message_info.get('duration', 'N/A') + "
  • ") + if video_file_api != '': + response_html += ( + "
  • Video: Download
  • ") + else: + response_html += "
  • Video not found
  • " + response_html += ( + "
" + "") + return response_html + + def get_videomessage(self, m_num): + """Get video message.""" + m_dir = '/home/bticino/cfg/extra/47/messages' + m_folder = os.path.join(m_dir, 'message_' + str(m_num)) + if not os.path.exists(m_folder): + return jsonify({'error': 'Message not found'}), 404 + # Assuming the video file is named aswm.avi + video_file = os.path.join(m_folder, 'aswm.avi') + if not os.path.exists(video_file): + return jsonify({'error': 'Video not found'}), 404 + return send_from_directory(m_folder, 'aswm.avi', as_attachment=True) + + def get_imagemessage(self, m_num): + """Get image message.""" + m_dir = '/home/bticino/cfg/extra/47/messages' + m_folder = os.path.join(m_dir, 'message_' + str(m_num)) + if not os.path.exists(m_folder): + return jsonify({'error': 'Message not found'}), 404 + # Assuming the image file is named aswm.jpg + image_file = os.path.join(m_folder, 'aswm.jpg') + if not os.path.exists(image_file): + return jsonify({'error': 'Image not found'}), 404 + return send_from_directory(m_folder, 'aswm.jpg', as_attachment=True) + + def read_xml_fwupgrade(self): + """Read XML file.""" + file = '/home/bticino/cfg/extra/FW/meta.xml' + xml_content = ElementTree.parse(file) + root = xml_content.getroot() + # Access the data + portal_version_fe = root.find(".//front_end").text + portal_version_scheduler = root.find(".//scheduler").text + binary = root.find(".//binary").text + url = root.find(".//url").text + md5 = root.find(".//checksum").text + ref = root.find(".//ref").text + brand = root.find(".//brand").text + platform = root.find(".//platform").text + label = root.find(".//label").text + description = root.find(".//description").text + # return in json format + json_result = { + 'portal_version_front_end': portal_version_fe, + 'portal_version_scheduler': portal_version_scheduler, + 'binary': binary, + 'url': url, + 'md5': md5, + 'ref': ref, + 'brand': brand, + 'platform': platform, + 'label': label, + 'description': description + } + return json_result + + def fwversion(self): + """Read actual firmware version.""" + fn = '/home/bticino/cfg/extra/.license_ver' + with open(fn, 'r') as f: + version = f.read() + json_result = { + 'version': version.rstrip('\n') + } + return json_result + + def read_file_content(self, file_path): + """Read file content.""" + try: + with open(file_path, 'r') as file: + return file.read() + except Exception as e: + return str(e) + + def get_cpu_temperature(self): + """Get CPU temperature.""" + temp_file_path = "/sys/class/thermal/thermal_zone0/temp" + try: + temp = self.read_file_content(temp_file_path) + temp_c = int(temp) / 1000 + return temp_c + except ValueError: + return None + + def get_load_average(self): + """Get load average.""" + load_file_path = "/proc/loadavg" + x = self.read_file_content(load_file_path) + return x.rstrip('\n') + + def main_menu(self): + """Main menu.""" + # Show the main menu options API + response = ( + "

Home Assistant API

" + "Version: " + __version__ + "

" + "Load
" + "Unlock
" + "Reboot
" + "Firmware Upgrade
" + "Firmware Version
" + "Leds
" + "JSON Messages - " + "HTML Messages
" + "

Configuration files

" + "View conf.xml" + " - Download conf.xml
" + "View stack_open.xml" + " - Download " + "stack_open.xml
" + ) + return response + + def load(self): + """Read load average and CPU temperature.""" + temp_c = self.get_cpu_temperature() + load = self.get_load_average() + # Prepare response in JSON format + response = "{\n" + response += "\"cpu_temperature\": " + str(temp_c) + ",\n" + response += "\"load\": \"" + load + "\"\n" + response += "}" + return response + + def unlock(self): + """Open door.""" + json_result = None + ok = '*#*1##' + # nok = '*#*0##' + data1 = "*8*19*20##" + r = self.send_data(data1) + if r == ok: + time.sleep(1) + data2 = "*8*20*20##" + r2 = self.send_data(data2) + if r2 == ok: + self.logger.info("Door opened") + json_result = { + 'result': 'ok' + } + else: + self.logger.error("Door not opened: " + str(r2)) + json_result = { + 'result': 'nok' + } + else: + self.logger.error("Door not opened: " + str(r)) + json_result = { + 'result': 'nok' + } + return json_result + + def send_data(self, data): + """Sent data.""" + host = '127.0.0.1' + port = 30006 + try: + # Create a socket object + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # Connect to the server + sock.connect((host, port)) + # Send the data + sock.sendall(data.encode()) + # Receive data from the socket (adjust the buffer size as needed) + response = sock.recv(1024).decode() + # Sleep for 1 second + time.sleep(1) + # Close the socket + sock.close() + return response # Return the received response + except Exception as e: + self.logger.error("An error occurred: " + str(e)) + return None # Return None if an error occurs + + def reboot(self): + """Reboot device.""" + self.logger.info("Rebooting device") + os.system("/sbin/shutdown -r now") + + def run(self): + """Run API.""" + self.app.run(host='0.0.0.0', port=5000) + + +if __name__ == '__main__': + app_instance = HomeAssistantAPI() + app_instance.run() + +# flask --app hello run diff --git a/ha_config/ha_config.ini b/ha_config/ha_config.ini new file mode 100644 index 0000000..59c9f23 --- /dev/null +++ b/ha_config/ha_config.ini @@ -0,0 +1,13 @@ +[DEFAULT] +loggingLEVEL = info +localfolder = /opt/ha_config/ + +[MQTT] +enableTLS = True +host = broker +port = 8883 +username = +password = +ca_cert = /etc/ssl/certs/m2mqtt_ca.crt +client_cert = /opt/ha_config/m2mqtt_srv_bticino.crt +client_key = /opt/ha_config/m2mqtt_srv_bticino.key diff --git a/ha_config/ha_control.py b/ha_config/ha_control.py new file mode 100644 index 0000000..640224f --- /dev/null +++ b/ha_config/ha_control.py @@ -0,0 +1,1230 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Author Version Date Comments +# FQuinto 0.0.1 2023-09-28 First version for Python 3.5 + +"""Home Assistant control.""" + +import time +import datetime +import os +import sys +import pyinotify +import glob +try: + import paho.mqtt.client as mqtt +except Exception as e: + print('Install "sudo -H pip3 install paho-mqtt"\n' + str(e)) + sys.exit(1) +import socket +import logging +from logging.handlers import RotatingFileHandler +import uuid +import json +import ipaddress +import configparser +import threading +import signal + +# Better is using lxml but difficult to install +from xml.etree import ElementTree +# from lxml import html, etree + + +# try: +# import schedule +# except Exception as e: +# print('Install "pip install --upgrade schedule"\n' + str(e)) +# sys.exit(1) + +__version__ = "0.0.1" + + +class EventHandler(pyinotify.ProcessEvent): + """Class of GPIO and LEDs detection event handler.""" + def __init__(self, logger, client): + super().__init__() + self.logger = logger + self.client = client + + def process_default(self, event): + """Process default.""" + value = None + if event.pathname.startswith('/sys/class/leds'): + topic = 'video_intercom/leds/state' + # For changes in symbolic links pointing to LED brightness + print("Change detected in symbolic link: %s", event.pathname) + # Read and print the content of the brightness file + with open(event.pathname, 'r') as f: + value = f.read().strip() + print("Brightness value: %s", value) + elif event.pathname.startswith('/sys/class/gpio'): + topic = 'video_intercom/gpio/state' + # For changes in GPIO value files + print("Change detected in GPIO value file: %s", event.pathname) + # Read and print the content of the value file + with open(event.pathname, 'r') as f: + value = f.read().strip() + print("Value: %s", value) + # Sent MQTT + message = json.dumps({"event_pathname": event.pathname, "value": value}) + self.client.publish(topic, message) + self.logger.info('GPIO or LED: ' + event.pathname + ' = ' + value) + + +class GPIOLEDsDetectionThread(threading.Thread): + """Class of GPIO and LEDs detection thread.""" + def __init__(self, logger, client, event): + super().__init__() + self.logger = logger + self.client = client + self.stop_event = event + + def stop(self): + """Stop thread.""" + self.stop_event.set() + self.logger.info('Thread keydetection stopped') + + def run(self): + """Thread for key detection.""" + self.logger.info('Thread keydetection started') + led_symlink_pattern = '/sys/class/leds/*/brightness' + gpio_file_pattern = '/sys/class/gpio/*/value' + + # Initialize an inotify watcher + wm = pyinotify.WatchManager() + + # Define the events you want to watch for (symbolic link modification) + mask = pyinotify.IN_MODIFY + + # Initialize the notifier + notifier = pyinotify.Notifier(wm, EventHandler(self.logger, self.client)) + + # Add watches for symbolic links and files matching the patterns + for symlink_path in glob.glob(led_symlink_pattern): + wdd = wm.add_watch(symlink_path, mask) + for gpio_file_path in glob.glob(gpio_file_pattern): + wdd = wm.add_watch(gpio_file_path, mask) + if wdd: + # Start the notifier loop to watch for changes + notifier.loop() + + +class KeyDetectionThread(threading.Thread): + """Class of key detection thread.""" + def __init__(self, logger, selector, client, event): + super().__init__() + self.logger = logger + self.selector = selector + self.client = client + self.stop_event = event + + def stop(self): + """Stop thread.""" + self.stop_event.set() + self.logger.info('Thread keydetection stopped') + + def run(self): + """Thread for key detection.""" + self.logger.info('Thread keydetection started') + selector = self.selector + client = self.client + while not self.stop_event.is_set(): + key_used = None + selectors = selector.select(timeout=0.1) + for key, mask in selectors: + device = key.fileobj + if self.stop_event.is_set(): + break + events = device.read() + for event in events: + if self.stop_event.is_set(): + break + etype = event.type + ecode = event.code + evalue = event.value + if etype == 1 and ecode == 2: + if evalue == 1: + # self.logger.info('Key1 pressed (key)') + key_used = 'key_PRESS' + else: + # self.logger.info('Key1 released (key)') + key_used = 'key_RELEASE' + elif etype == 1 and ecode == 3: + if evalue == 1: + # self.logger.info('Key2 pressed (star)') + key_used = 'star_PRESS' + else: + # self.logger.info('Key2 released (star)') + key_used = 'star_RELEASE' + elif etype == 1 and ecode == 4: + if evalue == 1: + # self.logger.info('Key3 pressed (eye)') + key_used = 'eye_PRESS' + else: + # self.logger.info('Key3 released (eye)') + key_used = 'eye_RELEASE' + elif etype == 1 and ecode == 5: + if evalue == 1: + # self.logger.info('Key4 pressed (phone)') + key_used = 'phone_PRESS' + else: + # self.logger.info('Key4 released (phone)') + key_used = 'phone_RELEASE' + # else: + # self.logger.info('Event: ' + str(event)) + # self.logger.info('Event type: ' + str(etype)) + # self.logger.info('Event code: ' + str(ecode)) + # self.logger.info('Event value: ' + str(evalue)) + # print(event) + # print(evdev.categorize(event)) + if key_used: + topic = 'video_intercom/keypad/state' + message = json.dumps({"event_type": key_used}) + client.publish(topic, message) + self.logger.info('Sent key: ' + topic + ' ' + message) + time.sleep(0.1) + if self.stop_event.is_set(): + break + self.logger.info('Thread keydetection finished') + + +class Control: + """Class of mqtt control.""" + + def __init__(self): + """Start the class.""" + # vars + self.stop_event = threading.Event() + self.child_thread = None + self.stop_main_thread = False + # self.detect_execution() + self.setuplogging() + self.create_vars() + self.check_certs_exist() + self.normal_execution() + + def detect_execution(self): + """Detect execution and continue or finnish.""" + cmd = 'pgrep -f python -a' + pid = os.popen(cmd).read() + # num_pid_python = pid.count('\n') + process = pid.split('\n') + found = 0 + for p in process: + if 'mqtt_control.py' in p: + found += 1 + if found > 2: + print('Exit without any execution') + exit() + + def setuplogging(self): + """Setup logging.""" + self.loggingLEVEL = None + self.readINIfile() + # Setup LOGGING + switcher = { + 'error': logging.ERROR, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'critical': logging.CRITICAL, + 'debug': logging.DEBUG + } + LOGGER_LEVEL = switcher.get(self.loggingLEVEL) + f = ('%(asctime)s - %(name)s - [%(levelname)s] ' + + '- %(funcName)s - %(message)s') + logging.basicConfig(level=LOGGER_LEVEL, format=f) + self.logger = logging.getLogger(__name__) + self.logger.setLevel(LOGGER_LEVEL) + + # Añadiendo logging rotativo + logpath = '/var/log/ha_control.log' + handler = RotatingFileHandler(logpath, maxBytes=10485760, + backupCount=3) + formatter = logging.Formatter(f) + # Añadiendo el formato al handler + handler.setFormatter(formatter) + # Añadiendo el handler al logger + self.logger.addHandler(handler) + message = "Log saving in: " + logpath + self.logger.info(message + ' version: ' + __version__) + + def create_vars(self): + """First creation of vars.""" + self.r = str(socket.gethostname()) + # Sample: 'C3X-00-03-50-00-00-00-9999999' + # MODEL-MAC_ADRESS-SERIAL_NUMBER + mac = int(uuid.getnode()) + self.mac_address = (':'.join(("%012X" % mac)[i:i + 2] + for i in range(0, 12, 2))) + mac_extract = self.r[4:21].upper().replace('-', ':') + self.serial_number = self.r[22:29] + self.model = self.r[0:3] + + if self.mac_address == mac_extract: + self.logger.info( + 'MAC address is correct in network and hostname ' + 'is : ' + self.mac_address) + else: + self.logger.warning( + 'MAC address is different: ' + + self.mac_address + + ' and ' + mac_extract) + # Sample: 'C3X-9999999' + self.id = self.model + '-' + self.serial_number + + def setupkeydetection(self): + """Setup key detection.""" + from evdev import InputDevice + from selectors import DefaultSelector, EVENT_READ + selector = DefaultSelector() + gpios = InputDevice('/dev/input/event0') + selector.register(gpios, EVENT_READ) + return selector + + def multicast_listener(self): + """Multicast listener.""" + # mls = multicast listener socket + self.mls = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.mls.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + # Bind the socket to the desired address and port + self.mls.bind(("0.0.0.0", 7667)) + + # Join the multicast group + multicast_group = "239.255.76.67" + self.mls.setsockopt( + socket.IPPROTO_IP, + socket.IP_ADD_MEMBERSHIP, + socket.inet_aton(multicast_group) + socket.inet_aton("0.0.0.0")) + + # Set up event handling + self.mls.settimeout(1.0) + self.logger.info("Multicast listener socket created") + + def rawsocket(self): + """Listener raw socket.""" + # rs = raw socket + self.rs = socket.socket( + socket.AF_PACKET, + socket.SOCK_RAW, socket.ntohs(0x0003)) + # Set up event handling + self.rs.settimeout(1.0) + self.logger.info("Raw socket created") + + def parse_packet2(self, packet): + """Parse packet.""" + result = None + hex_data = packet.hex() + + # Find the starting and ending indices of the desired portion + start_index = hex_data.find('2a') # 2a = '*' character + # 2323 = '##' characters, include them in the extracted data + end_index = hex_data.find('2323') + 4 + + # Extract the desired portion of the hexadecimal data + desired_hex_data = hex_data[start_index:end_index] + # check if extracted data is even-length + if len(desired_hex_data) % 2 == 0: + try: + # Decoding the extracted hexadecimal data + bytes_data = bytes.fromhex(desired_hex_data) + utf8_data = bytes_data.decode('utf-8', errors='ignore') + result = utf8_data + except Exception as e: + self.logger.error( + "Error message: " + str(e) + + " desired_hex_data: " + desired_hex_data) + return result + + def parse_cmd(self, cmd): + """Parse cmd.""" + result = None + # states = sensors = switches + if cmd == '*7*73#1#100*##': + result = 'display ON' + elif cmd == '*7*73#1#10*##': + result = 'display OFF' + elif cmd == '*#8**33*0##': + result = 'bell OFF' + elif cmd == '*#8**#33*0##': + result = 'bell OFF event?' + elif cmd == '*#8**33*1##': + result = 'bell ON' + elif cmd == '*#8**#33*1##': + result = 'bell ON event?' + elif cmd == '*8*92##': + result = 'voicemail OFF' # vde_aswm_disabled + elif cmd == '*#8**40*0*0*9815*1*25##': + result = 'voicemail OFF using App' + elif cmd == '*8*91##': + result = 'voicemail ON' # vde_aswm_enabled + elif cmd == '*#8**40*1*0*9815*1*25##': + result = 'voicemail ON using App' + # events = triggers + elif cmd == '*8*19*20##': + result = 'door open button press' + elif cmd == '*8*20*20##': + result = 'door open button release' + elif cmd == '*8*21*16##': + result = 'light ON button press' + elif cmd == '*8*22*16##': + result = 'light ON button release' + # others + elif cmd == '*#*1##': + result = 'ACK' + elif cmd == '*#*0##': + result = 'NACK' + elif cmd == '*8*3#6#2*416##': + result = '(no idea) what is this? ' + cmd + elif cmd == '*8*80#6#2*16##': + result = '(no idea) what is this? ' + cmd + elif cmd == '*#130**1##': + result = 'status request' + return result + + def parse_packet(self, packet): + """Parse packet.""" + result = None + doorbell = '*8*1#1#4#21*16##' + bin_doorbell = doorbell.encode().hex() + open_press = '*8*19*20##' + bin_open_press = open_press.encode().hex() + open_release = '*8*20*20##' + bin_open_release = open_release.encode().hex() + # Extract packet data and source address + p_data, address = packet + pdathx = p_data.hex() + + # Check if it's a TCP packet + if p_data[23] == 6: # 6 corresponds to TCP in IP header + # if p_data[23] != 17: # 17 corresponds to UDP in IP header + # Check if it's not ICMP + if p_data[20] != 1: # 1 corresponds to ICMP in IP header + # Extract destination port from TCP header + dst_port = (p_data[36] << 8) + p_data[37] + org_port = (p_data[34] << 8) + p_data[35] + + # Check if it's not one of the excluded ports + aports = {5007, 5060, 20000, 30006} + bports = {5007, 5060, 20000} + if dst_port not in aports and org_port not in bports: + # Print the packet data (hexadecimal representation) + if bin_doorbell in pdathx: + self.logger.debug( + "Packet doorbell from {}: {}".format( + address, pdathx) + + " Puerto de origen: {}".format(org_port) + + " Puerto de destino: {}".format(dst_port)) + result = 'DOORBELL' + elif bin_open_press in pdathx: + self.logger.debug( + "Packet press from {}: {}".format(address, pdathx) + + " Puerto de origen: {}".format(org_port) + + " Puerto de destino: {}".format(dst_port)) + result = 'PRESS' + elif bin_open_release in pdathx: + self.logger.debug( + "Packet release from {}: {}".format( + address, pdathx) + + " Puerto de origen: {}".format(org_port) + + " Puerto de destino: {}".format(dst_port)) + result = 'RELEASE' + else: + pass + return result + + def check_certs_exist(self): + """Check if certs exist.""" + if self.enableTLS: + if not os.path.isfile(self.ca_cert): + self.logger.error('ca_cert not found: ' + self.ca_cert) + sys.exit(1) + if not os.path.isfile(self.certfile): + self.logger.error('client_cert not found: ' + self.certfile) + sys.exit(1) + if not os.path.isfile(self.keyfile): + self.logger.error('client_key not found: ' + self.keyfile) + sys.exit(1) + + def signal_handler(self, signum, frame): + """Signal handler.""" + # def handler(signum, frame): + self.logger.info('Signal handler called with signal ' + str(signum)) + if self.child_thread: + self.child_thread.stop() + self.stop_main_thread = True + self.stop_event.set() + # return handler + + def normal_execution(self): + """Execute normal main.""" + # Signal + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, self.signal_handler) + # MQTT connection + random_id = str(uuid.uuid4()) + client = mqtt.Client(random_id) + client.on_connect = self.on_connect + client.on_message = self.on_message + client.on_disconnect = self.on_disconnect + client.on_publish = self.on_publish + client.on_subscribe = self.on_subscribe + if self.enableTLS: + ca_certs = self.ca_cert + certfile = self.certfile + keyfile = self.keyfile + client.tls_set(ca_certs, certfile, keyfile, + cert_reqs=mqtt.ssl.CERT_REQUIRED, + tls_version=mqtt.ssl.PROTOCOL_TLSv1_2, + ciphers=None) + if self.is_valid_host(self.host_mqtt): + self.logger.info('Host is valid') + else: + self.logger.error('Host is not valid, change it') + sys.exit(1) + if self.u_mqtt != '': + self.logger.info('User is not empty') + client.username_pw_set(self.u_mqtt, self.p_mqtt) + client.connect(self.host_mqtt, self.port_mqtt, 60) + self.logger.info('I\'m ' + self.r) + self.now = datetime.datetime.now() + + # Init CONFIG discovery messages: use retain=True + jsondata = self.readXMLfileversion() + # Set config lock + (t, m) = self.sent_mqtt_config_lock(jsondata) + client.publish(t, m, retain=True) + # Set config display (sensor) + (t, m) = self.sent_mqtt_config_display(jsondata) + client.publish(t, m, retain=True) + # Set config voicemail (switch) + (t, m) = self.sent_mqtt_config_voicemail(jsondata) + client.publish(t, m, retain=True) + # Set config doorbell sound (switch) + (t, m) = self.sent_mqtt_config_doorbell_sound(jsondata) + client.publish(t, m, retain=True) + # Set config doorbell (trigger) + (t, m) = self.sent_mqtt_config_doorbell_trigger(jsondata) + client.publish(t, m, retain=True) + # Set config keypad (trigger) + (t, m) = self.sent_mqtt_config_keypad(jsondata) + client.publish(t, m, retain=True) + # Init multicast listener + self.multicast_listener() + # Init raw socket + self.rawsocket() + + selector = self.setupkeydetection() + # Create new thread for key detection + # self.child_thread = threading.Thread( + # target=self.keydetection, args=(selector, client)) + self.child_thread = KeyDetectionThread( + self.logger, selector, client, self.stop_event) + # self.child_thread.run(selector, client) + self.child_thread.start() + # GPIO and LEDs + self.child_thread_2 = GPIOLEDsDetectionThread( + self.logger, client, self.stop_event) + self.child_thread_2.start() + + r_cmd = None + last_rcmd = None + while True: + # multicast listener socket + try: + # Adjust the buffer size as needed + data, addr = self.mls.recvfrom(1024) + # data + r_cmd = self.parse_packet2(data) + if r_cmd: + if r_cmd == last_rcmd: + pass + else: + msg_data = self.parse_cmd(r_cmd) + if msg_data: + self.logger.info( + "Received (mls): " + msg_data + + " from " + str(addr)) + if msg_data == 'display ON': + topic = 'video_intercom/display/state' + message = json.dumps({"display": "ON"}) + client.publish(topic, message) + elif msg_data == 'display OFF': + topic = 'video_intercom/display/state' + message = json.dumps({"display": "OFF"}) + client.publish(topic, message) + elif 'voicemail ON' in msg_data: + topic = 'video_intercom/voicemail/state' + message = "ON" + client.publish(topic, message) + elif 'voicemail OFF' in msg_data: + topic = 'video_intercom/voicemail/state' + message = "OFF" + client.publish(topic, message) + elif msg_data == 'bell ON': + topic = 'video_intercom/doorbellsound/state' + message = "ON" + client.publish(topic, message) + elif msg_data == 'bell OFF': + topic = 'video_intercom/doorbellsound/state' + message = "OFF" + client.publish(topic, message) + else: + pass + else: + self.logger.info( + "Received (mls): " + r_cmd + + " from " + str(addr)) + except socket.timeout: + pass + except Exception as e: + self.logger.error("Error message (mls): " + str(e)) + last_rcmd = r_cmd + + # raw socket + try: + # Adjust the buffer size as needed + packet = self.rs.recvfrom(65535) + # packet trigger + trigger = self.parse_packet(packet) + if trigger: + self.logger.info("Received (rs): " + str(trigger)) + if trigger == 'DOORBELL': + topic = 'video_intercom/doorbell/state' + message = trigger + client.publish(topic, message) + elif trigger == 'PRESS': + topic = 'video_intercom/lock/state' + message = 'UNLOCKING' + client.publish(topic, message) + time.sleep(1) + message = 'UNLOCKED' + client.publish(topic, message) + elif trigger == 'RELEASE': + topic = 'video_intercom/lock/state' + message = 'LOCKING' + client.publish(topic, message) + time.sleep(1) + message = 'LOCKED' + client.publish(topic, message) + else: + pass + except socket.timeout: + pass + except Exception as e: + self.logger.error("Error message (rs): " + str(e)) + + try: + client.loop_start() + except KeyboardInterrupt: + client.loop_stop() + self.logger.info('Salida debido a CTRL+C') + break + except Exception as e: + self.logger.error(str(e)) + if self.stop_main_thread: + break + time.sleep(0.1) + + # End while + self.logger.info('Normal exit') + # Wait for child thread to terminate + if self.child_thread: + self.child_thread.join() + client.disconnect() + client.loop_stop() + self.mls.close() + self.rs.close() + + def sent_mqtt_config_lock(self, jsondata): + """Sent mqtt config lock.""" + # Get data + version = jsondata['version'] + model = jsondata['model'] + self.model = model + availability_topic = 'video_intercom/state' + # Lock config + # https://www.home-assistant.io/integrations/lock.mqtt/ + t_config = 'homeassistant/lock/intercom/door/config' + m_config_json = { + "name": "Lock", + "unique_id": self.r + "_intercom_lock", + "command_topic": "video_intercom/lock/set", + "state_topic": "video_intercom/lock/state", + "availability_topic": availability_topic, + "payload_available": "online", + "payload_not_available": "offline", + "state_locked": "LOCKED", + "state_unlocked": "UNLOCKED", + "state_locking": "LOCKING", + "state_unlocking": "UNLOCKING", + "payload_lock": "LOCK", + "payload_unlock": "UNLOCK", + "device_class": "lock", + "icon": "mdi:lock", + "device": { + "identifiers": [self.id], + "manufacturer": "Bticino-Legrand", + "name": "Intercom " + self.id, + "model": model, + "sw_version": version, + "configuration_url": "http://" + self.get_local_ip(), + }, + "json_attributes_topic": "video_intercom/lock/attributes", + "json_attributes_template": "{{ value_json }}", + "availability_template": "{{ value_json.availability | to_json }}", + "platform": "mqtt" + } + # payload must be a string, bytearray, int, float or None. + m_config = json.dumps(m_config_json) + return (t_config, m_config) + + def sent_mqtt_config_display(self, jsondata): + """Display config sensor.""" + # Get data + version = jsondata['version'] + model = jsondata['model'] + self.model = model + availability_topic = 'video_intercom/state' + # Display config = sensor + # https://www.home-assistant.io/integrations/sensor.mqtt/ + t_config = 'homeassistant/sensor/intercom/display/config' + m_config_json = { + "name": "Display", + "unique_id": self.r + "_intercom_display", + "state_topic": "video_intercom/display/state", + "availability_topic": availability_topic, + "payload_available": "online", + "payload_not_available": "offline", + "icon": "mdi:tablet", + "device": { + "identifiers": [self.id], + "manufacturer": "Bticino-Legrand", + "name": "Intercom " + self.id, + "model": model, + "sw_version": version, + "configuration_url": "http://" + self.get_local_ip(), + }, + "value_template": "{{ value_json.display }}", + "json_attributes_topic": "video_intercom/display/attributes", + "json_attributes_template": "{{ value_json }}", + "availability_template": "{{ value_json.availability | to_json }}", + "entity_category": "diagnostic", + "platform": "mqtt" + } + m_config = json.dumps(m_config_json) + return (t_config, m_config) + + def sent_mqtt_config_voicemail(self, jsondata): + """Send mqtt config answer machine (voicemail) switch.""" + # Get data + version = jsondata['version'] + model = jsondata['model'] + self.model = model + availability_topic = 'video_intercom/state' + # Voicemail config = switch + # https://www.home-assistant.io/integrations/switch.mqtt/ + t_config = 'homeassistant/switch/intercom/voicemail/config' + m_config_json = { + "name": "Voicemail", + "unique_id": self.r + "_intercom_voicemail", + "command_topic": "video_intercom/voicemail/set", + "state_topic": "video_intercom/voicemail/state", + "availability_topic": availability_topic, + "payload_available": "online", + "payload_not_available": "offline", + "state_on": "ON", + "state_off": "OFF", + "payload_on": "ON", + "payload_off": "OFF", + "icon": "mdi:voicemail", + "device": { + "identifiers": [self.id], + "manufacturer": "Bticino-Legrand", + "name": "Intercom " + self.id, + "model": model, + "sw_version": version, + "configuration_url": "http://" + self.get_local_ip(), + }, + "json_attributes_topic": "video_intercom/voicemail/attributes", + "json_attributes_template": "{{ value_json }}", + "availability_template": "{{ value_json.availability | to_json }}", + "platform": "mqtt" + } + m_config = json.dumps(m_config_json) + return (t_config, m_config) + + def sent_mqtt_config_doorbell_sound(self, jsondata): + """Send mqtt config doorbell sound or not.""" + # Get data + version = jsondata['version'] + model = jsondata['model'] + self.model = model + availability_topic = 'video_intercom/state' + # Doorbellsound config = switch + # https://www.home-assistant.io/integrations/switch.mqtt/ + t_config = 'homeassistant/switch/intercom/doorbellsound/config' + m_config_json = { + "name": "Doorbellsound", + "unique_id": self.r + "_intercom_doorbellsound", + "command_topic": "video_intercom/doorbellsound/set", + "state_topic": "video_intercom/doorbellsound/state", + "availability_topic": availability_topic, + "payload_available": "online", + "payload_not_available": "offline", + "state_on": "ON", + "state_off": "OFF", + "payload_on": "ON", + "payload_off": "OFF", + "icon": "mdi:bell", + "device": { + "identifiers": [self.id], + "manufacturer": "Bticino-Legrand", + "name": "Intercom " + self.id, + "model": model, + "sw_version": version, + "configuration_url": "http://" + self.get_local_ip(), + }, + "json_attributes_topic": "video_intercom/doorbellsound/attributes", + "json_attributes_template": "{{ value_json }}", + "availability_template": "{{ value_json.availability | to_json }}", + "platform": "mqtt" + } + m_config = json.dumps(m_config_json) + return (t_config, m_config) + + def sent_mqtt_config_doorbell_trigger(self, jsondata): + """Sent mqtt config doorbell trigger.""" + # Get data + version = jsondata['version'] + model = jsondata['model'] + self.model = model + availability_topic = 'video_intercom/state' + # Doorbell config = trigger + # https://www.home-assistant.io/integrations/device_trigger.mqtt/ + t_config = 'homeassistant/device_automation/intercom/doorbell/config' + m_config_json = { + "name": "Doorbell", + "unique_id": self.r + "_intercom_doorbell", + "automation_type": "trigger", + "type": "action", + "subtype": "doorbell", + "state_topic": "video_intercom/doorbell/state", + "availability_topic": availability_topic, + "payload_available": "online", + "payload_not_available": "offline", + "icon": "mdi:doorbell", + "topic": "video_intercom/doorbell/state", + "payload": "DOORBELL", + "device": { + "identifiers": [self.id], + "manufacturer": "Bticino-Legrand", + "name": "Intercom " + self.id, + "model": model, + "sw_version": version, + "configuration_url": "http://" + self.get_local_ip(), + }, + "json_attributes_topic": "video_intercom/doorbell/attributes", + "json_attributes_template": "{{ value_json }}", + "availability_template": "{{ value_json.availability | to_json }}", + "platform": "mqtt" + } + m_config = json.dumps(m_config_json) + return (t_config, m_config) + + def sent_mqtt_config_keypad(self, jsondata): + """Sent mqtt config keypad.""" + # Get data + version = jsondata['version'] + model = jsondata['model'] + self.model = model + availability_topic = 'video_intercom/state' + # keypad config = trigger + # https://www.home-assistant.io/integrations/trigger.mqtt/ + t_config = 'homeassistant/event/intercom/keypad/config' + m_config_json = { + "name": "Keypad", + "unique_id": self.r + "_intercom_keypad", + "automation_type": "trigger", + "type": "action", + "subtype": "keypad", + "state_topic": "video_intercom/keypad/state", + "availability_topic": availability_topic, + "payload_available": "online", + "payload_not_available": "offline", + "icon": "mdi:dialpad", + "topic": "video_intercom/keypad/state", + "device": { + "identifiers": [self.id], + "manufacturer": "Bticino-Legrand", + "name": "Intercom " + self.id, + "model": model, + "sw_version": version, + "configuration_url": "http://" + self.get_local_ip(), + }, + "json_attributes_topic": "video_intercom/keypad/attributes", + "json_attributes_template": "{{ value_json }}", + "availability_template": "{{ value_json.availability | to_json }}", + "platform": "mqtt" + } + m_config = json.dumps(m_config_json) + return (t_config, m_config) + + def is_valid_ip(self, ip): + """Check if IP is valid.""" + try: + ipaddress.ip_address(ip) + return True + except ValueError: + return False + + def is_valid_host(self, host): + """Check if host is valid.""" + try: + socket.gethostbyname(host) + return True + except socket.gaierror: + return False + + def get_local_ip(self): + """Get local IP.""" + try: + # Create a socket object to get local IP address + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + # Connect to a remote server (doesn't send data) + s.connect(("8.8.8.8", 80)) + + # Get the local IP address + local_ip = s.getsockname()[0] + + return local_ip + except Exception as e: + self.logger.error("An error occurred: " + str(e)) + return None + + def get_router_ip(self): + """Get router IP.""" + try: + local_ip = self.get_local_ip() + # Get the router IP address + router_ip = local_ip[:local_ip.rfind(".")] + ".1" + return router_ip + except Exception as e: + self.logger.error("An error occurred: " + str(e)) + return None + + def readXMLfileversion(self): + """Read XML file.""" + file = '/home/bticino/sp/dbfiles_ws.xml' + xml_content = ElementTree.parse(file) + root = xml_content.getroot() + # Access the data + date_fw = root.find(".//date").text + model_fw = root.find(".//webserver_type").text + version_fw = root.find(".//ver_webserver").text + file_fw = root.find(".//latest_sp").text + # return in json format + json_result = { + 'date': date_fw, + 'model': model_fw, + 'version': version_fw, + 'file': file_fw + } + return json_result + + def unlock(self): + """Open door.""" + ok = '*#*1##' + # nok = '*#*0##' + data1 = "*8*19*20##" + r = self.send_data(data1) + if r == ok: + time.sleep(1) + data2 = "*8*20*20##" + r2 = self.send_data(data2) + if r2 == ok: + self.logger.info("Door opened") + else: + self.logger.error("Door not opened: " + str(r2)) + else: + self.logger.error("Door not opened: " + str(r)) + + def voicemail(self, state): + """Set voicemail state to ON or OFF.""" + ok = '*#*1##' + # nok = '*#*0##' + data0 = "*7*73#1#100*##" + if state == 'ON': + data = "*8*91##" + data2 = "*#8**40*1*0*9815*1*25##" + data3 = "*8*91*##" + elif state == 'OFF': + data = "*8*92##" + data2 = "*#8**40*0*0*9815*1*25##" + data3 = "*8*92*##" + else: + self.logger.error("Wrong state: " + state) + r0 = self.send_data(data0) + time.sleep(0.31) + r1 = self.send_data(data) + time.sleep(0.31) # 0.31 seconds (not change this value) + r2 = self.send_data(data2) + time.sleep(0.31) # 0.31 seconds (not change this value) + r3 = self.send_data(data3) + if r0 == ok: + r0_txt = 'OK' + else: + r0_txt = 'NOK' + if r1 == ok: + r1_txt = 'OK' + else: + r1_txt = 'NOK' + if r3 == ok: + r3_txt = 'OK' + else: + r3_txt = 'NOK' + if r2 == ok: + self.logger.info( + "Voicemail " + state + ": r3 " + r3_txt + + " r1 " + r1_txt + + " r0 " + r0_txt) + return True + else: + self.logger.error( + "Voicemail not " + state + ": r3 " + r3_txt + + " r1 " + r1_txt + + " r0 " + r0_txt) + return False + + def doorbell_sound(self, state): + """Set doorbell sound state to ON or OFF.""" + ok = '*#*1##' + # nok = '*#*0##' + if state == 'ON': + data = "*#8**33*1##" + elif state == 'OFF': + data = "*#8**33*0##" + else: + self.logger.error("Wrong state: " + state) + r = self.send_data(data) + if r == ok: + self.logger.info("Doorbell sound " + state) + return True + else: + self.logger.error("Doorbell sound not " + state + ": " + str(r)) + return False + + def send_data(self, data): + """Sent data.""" + host = '127.0.0.1' + port = 30006 + try: + # Create a socket object + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + # Connect to the server + sock.connect((host, port)) + # Send the data + sock.sendall(data.encode()) + # Receive data from the socket (adjust the buffer size as needed) + response = sock.recv(1024).decode() + # Sleep for 1 second + time.sleep(1) + # Close the socket + sock.close() + return response # Return the received response + except Exception as e: + self.logger.error("Error occurred: " + str(e)) + return None # Return None if an error occurs + + def readINIfile(self): + """Read INI file.""" + config = configparser.ConfigParser() + script_dir = os.path.dirname(__file__) + rel_path = "./ha_config.ini" + abs_file_path = os.path.join(script_dir, rel_path) + config.read(abs_file_path) + self.loggingLEVEL = config['DEFAULT']['loggingLEVEL'] + self.localfolder = config['DEFAULT']['localfolder'] + self.enableTLS = config['MQTT']['enableTLS'] + self.host_mqtt = config['MQTT']['host'] + self.port_mqtt = int(config['MQTT']['port']) + self.u_mqtt = config['MQTT']['username'] + self.p_mqtt = config['MQTT']['password'] + if self.enableTLS == 'True': + self.ca_cert = config['MQTT']['ca_cert'] + self.certfile = config['MQTT']['client_cert'] + self.keyfile = config['MQTT']['client_key'] + print('TLS enabled: ' + self.enableTLS) + print('ca_cert: ' + self.ca_cert + + ' client_cert: ' + self.certfile + + ' client_key: ' + self.keyfile) + else: + self.ca_cert = None + self.certfile = None + self.keyfile = None + + def on_connect(self, client, userdata, flags, rc): + """MQTT when connect.""" + self.logger.info("Connected with result code " + str(rc)) + + # Subscribe to topics + topic = "video_intercom/lock/set" + client.subscribe(topic) + self.logger.info("Subscribed to " + topic) + topic = "video_intercom/voicemail/set" + client.subscribe(topic) + self.logger.info("Subscribed to " + topic) + topic = "video_intercom/doorbellsound/set" + client.subscribe(topic) + self.logger.info("Subscribed to " + topic) + + # State online when connect + availability_topic = 'video_intercom/state' + topic = availability_topic + # message = json.dumps({"availability": "online"}) + message = "online" + client.publish(topic, message) + self.logger.info('Sent: ' + topic + ' ' + message) + + # get_router_ip = self.get_router_ip() + + state_topic_lock = 'video_intercom/lock/state' + topic = state_topic_lock + message = 'LOCKED' + client.publish(topic, message) + + topic = 'video_intercom/display/state' + message = json.dumps({"display": "OFF"}) + client.publish(topic, message) + + topic = 'video_intercom/doorbell/state' + message = json.dumps({"event_type": "RELEASE"}) + client.publish(topic, message) + + # state_topic_keypad = 'video_intercom/keypad/state' + # topic = state_topic_keypad + + def on_disconnect(self, client, userdata, rc): + """MQTT when disconnect.""" + if rc != 0: + self.logger.error("Unexpected disconnection. R. Code " + str(rc)) + while not client.is_connected(): + try: + self.logger.info("Attempting to reconnect...") + client.reconnect() + time.sleep(10) + except Exception as e: + self.logger.error("Reconnection failed: " + str(e)) + else: + self.logger.warning("Disconected with result code " + str(rc)) + + def on_subscribe(self, client, userdata, mid, granted_qos): + """MQTT when subcribe.""" + self.logger.debug("Subscription from " + str(mid)) + + def on_publish(self, client, userdata, mid): + """MQTT when publish.""" + self.logger.debug("Publish from " + str(mid) + " message published") + # View user data + # self.logger.info("User data: " + str(userdata)) + + def on_message(self, client, userdata, msg): + """MQTT when message is received.""" + self.logger.info('New msg. Topic: ' + str(msg.topic) + + ' ' + str(msg.payload)) + + msg_payload = (msg.payload).decode("utf-8") + + if (msg.topic == 'video_intercom/voicemail/set'): + message = None + if msg_payload == 'ON': + r = self.voicemail('ON') + if r: + message = "ON" + else: + message = "OFF" + elif msg_payload == 'OFF': + r = self.voicemail('OFF') + if r: + message = "OFF" + else: + message = "ON" + else: + self.logger.error('Wrong payload: ' + msg_payload) + if message: + topic = 'video_intercom/voicemail/state' + client.publish(topic, message) + elif (msg.topic == 'video_intercom/doorbellsound/set'): + message = None + if msg_payload == 'ON': + r = self.doorbell_sound('ON') + if r: + message = "ON" + else: + message = "OFF" + elif msg_payload == 'OFF': + r = self.doorbell_sound('OFF') + if r: + message = "OFF" + else: + message = "ON" + else: + self.logger.error('Wrong payload: ' + msg_payload) + if message: + topic = 'video_intercom/doorbellsound/state' + client.publish(topic, message) + elif (msg.topic == 'video_intercom/lock/set'): + topic = 'video_intercom/lock/state' + if msg_payload == 'UNLOCK': + # sent unlocking + message = 'UNLOCKING' + client.publish(topic, message) + self.unlock() + # sent unlocked + message = 'UNLOCKED' + client.publish(topic, message) + time.sleep(1) + # sent locked + message = 'LOCKED' + client.publish(topic, message) + elif msg_payload == 'LOCK': + # Sent locking + message = 'LOCKING' + client.publish(topic, message) + time.sleep(1) + # Sent locked + message = 'LOCKED' + client.publish(topic, message) + else: + self.logger.error('Wrong payload: ' + msg_payload) + + def is_host_available(self, host): + import subprocess + try: + # Run the ping command and capture the output + output = subprocess.check_output(["ping", "-c", "1", host]) + + # Check if the output contains a successful ping + if b"1 received" in output: + return True + else: + return False + except subprocess.CalledProcessError: + return False + + def get_availability_device(self, ip_device): + """Get availability of device.""" + result = self.is_host_available(ip_device) + if result: + r = "online" + else: + r = "offline" + self.logger.info("Availability are: " + r) + return r + + +if __name__ == "__main__": + Control()