Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

script without GPS/capcode lookup #54

Open
spiralshapeturtle opened this issue Dec 28, 2024 · 3 comments
Open

script without GPS/capcode lookup #54

spiralshapeturtle opened this issue Dec 28, 2024 · 3 comments

Comments

@spiralshapeturtle
Copy link

Hello @cyberjunky Ron

I'm still using your script for years now, it's extremely stable thanks for that. I process the data via node-red which creates push messages and TTS calls. I like to improve the speed of processing messages. Would you be able to clean out code and only provide messages to MQTT/ HA WebHook without the db/plaatsnaam/geo lookups to improve speed? Its quite a time offer I ask you, but this holiday season could be a fit ;-) I only need the P2000 text, and capcodes.

I have tried it via AI to do it myself but that ends in receiving 0 messages ;(

https://github.com/cyberjunky/RTL-SDR-P2000Receiver-HA

@Dinges28
Copy link
Contributor

In the config you can set the opencage/gps lookup on false.
And I don't know on what kind device you run this, but I have max 1-2 seconds delay before the message is accepted by Home Assistant or MQTT.
How much speed do you want?
P2000 messages are already delayed because the transmit interval is not realtime.

@spiralshapeturtle
Copy link
Author

Good morning, I have a redundant receiver environment. 1 feed from pagermon, and 1 feed via cyberjunky. The feed from pagermon (webhook) is very fast. That made me realize that I actually wanted a bit more speed via the python script. My wish is: near realtime.

That seems to have worked via claude.ai the tool has created a script that now forwards all decoded messages in real time. After that I asked to add performance optimizations such as multithread and a optimized MQTT handler etc. The tool provided the hint to add performance benchmarks in the code so that we can really see if we are making steps in speed.

The delay between pagermon and the original cyberjunky script is 2seconds and 2 milliseconds. That seems to be 0 delay right now if I compare the log timestamps.

Older script already enhanced by Claude.ai

Performance Statistics:
2024-12-29 00:00:24 - INFO -
mqtt_post:
2024-12-29 00:00:24 - INFO -   Average time: 0.61ms
2024-12-29 00:00:24 - INFO -   Min time: 0.27ms
2024-12-29 00:00:24 - INFO -   Max time: 1.89ms
2024-12-29 00:00:24 - INFO -   Message count: 13
2024-12-29 00:00:24 - INFO -
message_processing:
2024-12-29 00:00:24 - INFO -   Average time: 0.96ms
2024-12-29 00:00:24 - INFO -   Min time: 0.47ms
2024-12-29 00:00:24 - INFO -   Max time: 2.09ms
2024-12-29 00:00:24 - INFO -   Message count: 13
2024-12-29 00:00:24 - INFO - Receiver stopped

Current running version

mqtt_post:
2024-12-29 00:12:33 - INFO -   Average time: 0.30ms
2024-12-29 00:12:33 - INFO -   Min time: 0.19ms
2024-12-29 00:12:33 - INFO -   Max time: 0.88ms
2024-12-29 00:12:33 - INFO -   Message count: 24
2024-12-29 00:12:33 - INFO -
message_processing:
2024-12-29 00:12:33 - INFO -   Average time: 0.60ms
2024-12-29 00:12:33 - INFO -   Min time: 0.40ms
2024-12-29 00:12:33 - INFO -   Max time: 1.31ms
2024-12-29 00:12:33 - INFO -   Message count: 24
2024-12-29 00:12:33 - INFO - Receiver stopped



#!/usr/bin/env python3
import requests.adapters
import configparser
import json
import logging
import os
import re
import subprocess
import sys
import threading
import time
import queue
from collections import deque, defaultdict
from datetime import datetime
from threading import Event
from statistics import mean, median, stdev
from typing import Dict, List, Any

import paho.mqtt.client as mqtt
import requests

VERSION = "0.5.0"
CFGFILE = "config.ini"

# Simple pattern matching pipe-separated fields
FLEX_PATTERN = re.compile(
    r"^FLEX\|([^|]*)\|([^|]*)\|([^|]*)\|([^|]*)\|ALN\|([^|]*)"
)


class PerformanceMetrics:
    """Enhanced performance monitoring with detailed statistics."""

    def __init__(self):
        self.metrics: Dict[str, List[float]] = defaultdict(list)
        self.start_times: Dict[str, float] = {}
        self.message_latencies: List[float] = []
        self.processing_times: List[float] = []
        self.mqtt_delivery_times: List[float] = []

    def start(self, operation: str) -> None:
        """Start timing an operation."""
        self.start_times[operation] = time.perf_counter()

    def stop(self, operation: str) -> None:
        """Stop timing an operation and record duration."""
        if operation in self.start_times:
            duration = time.perf_counter() - self.start_times[operation]
            self.metrics[operation].append(duration)
            del self.start_times[operation]

    def record_latency(self, start_time: float) -> None:
        """Record message delivery latency."""
        latency = time.perf_counter() - start_time
        self.message_latencies.append(latency)

    def record_processing_time(self, duration: float) -> None:
        """Record message processing time."""
        self.processing_times.append(duration)

    def record_mqtt_delivery(self, duration: float) -> None:
        """Record MQTT delivery time."""
        self.mqtt_delivery_times.append(duration)

    def get_stats(self) -> Dict[str, Any]:
        """Return comprehensive performance statistics."""
        stats = {}

        # General operations stats
        for op, times in self.metrics.items():
            if times:
                stats[op] = {
                    "avg_ms": mean(times) * 1000,
                    "median_ms": median(times) * 1000,
                    "min_ms": min(times) * 1000,
                    "max_ms": max(times) * 1000,
                    "std_dev_ms": stdev(times) * 1000 if len(times) > 1 else 0,
                    "count": len(times),
                }

        # Message delivery latency stats
        if self.message_latencies:
            stats["message_latency"] = {
                "avg_ms": mean(self.message_latencies) * 1000,
                "median_ms": median(self.message_latencies) * 1000,
                "min_ms": min(self.message_latencies) * 1000,
                "max_ms": max(self.message_latencies) * 1000,
                "std_dev_ms": (
                    stdev(self.message_latencies) * 1000
                    if len(self.message_latencies) > 1
                    else 0
                ),
                "count": len(self.message_latencies),
            }

        # Processing time stats
        if self.processing_times:
            stats["processing_time"] = {
                "avg_ms": mean(self.processing_times) * 1000,
                "median_ms": median(self.processing_times) * 1000,
                "min_ms": min(self.processing_times) * 1000,
                "max_ms": max(self.processing_times) * 1000,
                "std_dev_ms": (
                    stdev(self.processing_times) * 1000
                    if len(self.processing_times) > 1
                    else 0
                ),
                "count": len(self.processing_times),
            }

        # MQTT delivery stats
        if self.mqtt_delivery_times:
            stats["mqtt_delivery"] = {
                "avg_ms": mean(self.mqtt_delivery_times) * 1000,
                "median_ms": median(self.mqtt_delivery_times) * 1000,
                "min_ms": min(self.mqtt_delivery_times) * 1000,
                "max_ms": max(self.mqtt_delivery_times) * 1000,
                "std_dev_ms": (
                    stdev(self.mqtt_delivery_times) * 1000
                    if len(self.mqtt_delivery_times) > 1
                    else 0
                ),
                "count": len(self.mqtt_delivery_times),
            }

        return stats


class ThreadManager:
    """Manages multiple worker threads and their lifecycle."""

    def __init__(self):
        self.threads = {}
        self.running = threading.Event()
        self.running.set()
        self.performance_metrics = PerformanceMetrics()

    def start_thread(self, name: str, target, daemon: bool = True) -> bool:
        """Start a new named thread."""
        if name in self.threads:
            logging.warning(f"Thread {name} already exists")
            return False

        thread = threading.Thread(
            target=self._thread_wrapper, args=(name, target), daemon=daemon
        )
        self.threads[name] = thread
        thread.start()
        logging.info(f"Started thread: {name}")
        return True

    def _thread_wrapper(self, name: str, target) -> None:
        """Wrapper to handle thread exceptions and cleanup."""
        try:
            logging.info(f"Thread {name} starting")
            self.performance_metrics.start(f"thread_{name}")
            target()
        except Exception as e:
            logging.error(f"Error in thread {name}: {str(e)}")
            logging.exception("Thread error details:")
        finally:
            self.performance_metrics.stop(f"thread_{name}")
            logging.info(f"Thread {name} stopped")
            if name in self.threads:
                del self.threads[name]

    def stop_all(self, timeout: float = 5) -> None:
        """Stop all threads gracefully."""
        self.running.clear()
        start_time = time.time()

        # Wait for threads to finish
        active_threads = list(self.threads.values())
        for thread in active_threads:
            remaining_time = max(0, timeout - (time.time() - start_time))
            if remaining_time > 0 and thread.is_alive():
                thread.join(timeout=remaining_time)

        # Check for any threads that didn't stop
        still_running = [t.name for t in active_threads if t.is_alive()]
        if still_running:
            logging.warning(f"Threads still running: {still_running}")

        self.threads.clear()


class MessageQueue:
    """Thread-safe message queue with priority support and performance monitoring."""

    def __init__(self, maxlen: int = 100):
        self.queue = deque(maxlen=maxlen)
        self.lock = threading.Lock()
        self.not_empty = threading.Condition(self.lock)
        self.not_full = threading.Condition(self.lock)
        self.metrics = PerformanceMetrics()

    def put(self, item, block: bool = True, timeout: float = None) -> bool:
        """Add an item to the queue with performance tracking."""
        self.metrics.start("queue_put")
        try:
            with self.lock:
                if block:
                    while len(self.queue) >= self.queue.maxlen:
                        if not self.not_full.wait(timeout):
                            raise queue.Full
                elif len(self.queue) >= self.queue.maxlen:
                    raise queue.Full

                self.queue.appendleft(item)
                self.not_empty.notify()
                return True
        finally:
            self.metrics.stop("queue_put")

    def get(self, block: bool = True, timeout: float = None):
        """Remove and return an item from the queue with performance tracking."""
        self.metrics.start("queue_get")
        try:
            with self.lock:
                if block:
                    while not self.queue:
                        if not self.not_empty.wait(timeout):
                            raise queue.Empty
                elif not self.queue:
                    raise queue.Empty

                item = self.queue.pop()
                self.not_full.notify()
                return item
        finally:
            self.metrics.stop("queue_get")

    def __len__(self) -> int:
        """Return the current size of the queue."""
        with self.lock:
            return len(self.queue)


class MQTTHandler:
    """Handles MQTT connection and message delivery with improved reliability and performance."""

    def __init__(self, config):
        self.config = config
        self.client = None
        self.connected = False
        self.reconnect_delay = 1  # Start with 1 second delay
        self.max_reconnect_delay = (
            60  # Maximum delay between reconnection attempts
        )
        self.pending_messages = deque(
            maxlen=1000
        )  # Store messages during disconnections
        self.metrics = PerformanceMetrics()
        self.setup_client()

    def setup_client(self):
        """Initialize MQTT client with all necessary callbacks."""
        try:
            self.client = mqtt.Client()

            # Set up authentication
            if self.config["user"] and self.config["password"]:
                self.client.username_pw_set(
                    self.config["user"], self.config["password"]
                )

            # Configure callbacks
            self.client.on_connect = self.on_connect
            self.client.on_disconnect = self.on_disconnect
            self.client.on_publish = self.on_publish

            # Configure QoS and other settings
            self.client.max_queued_messages_set(100)
            self.client.max_inflight_messages_set(20)

            # Enable built-in reconnect functionality
            self.client.reconnect_delay_set(min_delay=1, max_delay=60)

            self.metrics.start("mqtt_connect")
            self.client.connect(
                self.config["server"], self.config["port"], keepalive=60
            )
            self.client.loop_start()
            logging.info("MQTT client setup completed successfully")
        except Exception as e:
            logging.error(f"MQTT connection failed: {e}")
            self.connected = False
        finally:
            self.metrics.stop("mqtt_connect")

    def on_connect(self, client, userdata, flags, rc):
        """Handle connection events and retry sending pending messages."""
        if rc == 0:
            self.connected = True
            self.reconnect_delay = 1
            logging.info("Connected to MQTT broker")

            # Try to send any pending messages
            while self.pending_messages and self.connected:
                topic, payload, qos, start_time = (
                    self.pending_messages.popleft()
                )
                self.publish(topic, payload, qos, start_time)
        else:
            self.connected = False
            logging.error(f"MQTT connection failed with code {rc}")

    def on_disconnect(self, client, userdata, rc):
        """Handle disconnection events with exponential backoff."""
        self.connected = False
        if rc != 0:
            logging.warning(f"MQTT disconnected unexpectedly with code {rc}")
            self.reconnect_delay = min(
                self.reconnect_delay * 2, self.max_reconnect_delay
            )
        else:
            logging.info("MQTT disconnected normally")

    def on_publish(self, client, userdata, mid):
        """Confirm successful message publication."""
        self.metrics.stop(f"mqtt_publish_{mid}")
        logging.debug(f"Message {mid} published successfully")

    def publish(self, topic, payload, qos=1, start_time=None):
        """Publish message with QoS and performance tracking."""
        metric_name = f"mqtt_publish_{self.client._client_id}"  # Define metric name outside try block
        self.metrics.start(metric_name)
        try:
            if not start_time:
                start_time = time.perf_counter()

            if not self.connected:
                self.pending_messages.append((topic, payload, qos, start_time))
                logging.warning("MQTT not connected, message queued")
                return False

            info = self.client.publish(topic, payload, qos=qos)

            if info.rc != mqtt.MQTT_ERR_SUCCESS:
                self.pending_messages.append((topic, payload, qos, start_time))
                logging.error(f"Failed to publish message: {info.rc}")
                return False

            self.metrics.record_mqtt_delivery(time.perf_counter() - start_time)
            return True
        except Exception as e:
            logging.error(f"MQTT Error: {e}")
            self.pending_messages.append((topic, payload, qos, start_time))
            return False
        finally:
            self.metrics.stop(metric_name)  # Ensure metric is always stopped

    def stop(self):
        """Clean shutdown of MQTT connection."""
        if self.client:
            try:
                self.client.loop_stop()
                self.client.disconnect()
            except Exception as e:
                logging.error(f"Error during MQTT shutdown: {e}")


class MessageItem:
    """Message container with performance tracking."""

    __slots__ = (
        "timestamp",
        "message_raw",
        "body",
        "capcodes",
        "receivers",
        "is_posted",
        "creation_time",
    )

    def __init__(self, message_raw, timestamp, body, capcodes):
        self.timestamp = timestamp
        self.message_raw = message_raw
        self.body = body
        self.capcodes = capcodes
        self.receivers = " ".join(capcodes)
        self.is_posted = False
        self.creation_time = time.perf_counter()


def load_config():
    """Load or create config file."""
    config = configparser.ConfigParser()
    if config.read(CFGFILE):
        return config

    config["main"] = {"debug": "true", "benchmark": "true"}
    config["rtl-sdr"] = {
        "cmd": "rtl_fm -f 169.65M -M fm -s 22050 | multimon-ng -a FLEX -t raw -"
    }
    config["home-assistant"] = {
        "enabled": "true",
        "baseurl": "http://homeassistant.local:8123",
        "token": "YOUR_TOKEN_HERE",
    }
    config["mqtt"] = {
        "enabled": "true",
        "mqtt_server": "192.168.1.100",
        "mqtt_port": "1883",
        "mqtt_user": "mqttuser",
        "mqtt_password": "password",
        "mqtt_topic": "p2000",
        "mqtt_qos": "1",
    }

    with open(CFGFILE, "w") as f:
        config.write(f)
        logging.info(f"Created new config file: {CFGFILE}")
    return config


class P2000Receiver:
    def __init__(self):
        print(f"Initializing P2000 Receiver v{VERSION}...")
        self.thread_manager = ThreadManager()
        self.messages = MessageQueue(maxlen=100)
        self.config = load_config()
        self.mqtt_handler = None
        self.metrics = PerformanceMetrics()

        # Configure logging
        self.debug = self.config.getboolean("main", "debug")
        self.benchmark = self.config.getboolean(
            "main", "benchmark", fallback=True
        )

        log_level = logging.DEBUG if self.debug else logging.INFO
        logging.basicConfig(
            level=log_level,
            format="%(asctime)s - %(levelname)s - %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S",
        )

        logging.info("Loading configuration...")

        # Load config settings
        self.rtlfm_cmd = self.config.get("rtl-sdr", "cmd")
        logging.info(f"RTL-SDR command: {self.rtlfm_cmd}")

        # Initialize Home Assistant
        self.use_hass = self.config.getboolean("home-assistant", "enabled")
        if self.use_hass:
            self.baseurl = self.config.get("home-assistant", "baseurl")
            self.token = self.config.get("home-assistant", "token")
            self.setup_home_assistant()

        # Initialize MQTT
        self.use_mqtt = self.config.getboolean("mqtt", "enabled")
        if self.use_mqtt:
            self.setup_mqtt()

        logging.info("Initialization complete.")

    def setup_home_assistant(self):
        """Initialize Home Assistant connection."""
        try:
            logging.info("Setting up Home Assistant connection...")
            self.http_session = requests.Session()
            adapter = requests.adapters.HTTPAdapter(
                pool_connections=1,
                pool_maxsize=1,
                max_retries=3,
                pool_block=False,
            )
            self.http_session.mount("http://", adapter)
            self.http_session.mount("https://", adapter)
            self.http_session.headers.update(
                {
                    "Authorization": f"Bearer {self.token}",
                    "content-type": "application/json",
                    "Connection": "keep-alive",
                }
            )
            logging.info("Home Assistant connection setup complete")
        except Exception as e:
            logging.error(f"Error setting up Home Assistant: {e}")
            self.use_hass = False

    def setup_mqtt(self):
        """Initialize MQTT connection."""
        try:
            logging.info("Setting up MQTT connection...")
            self.mqtt_config = {
                "server": self.config.get("mqtt", "mqtt_server"),
                "port": self.config.getint("mqtt", "mqtt_port"),
                "user": self.config.get("mqtt", "mqtt_user"),
                "password": self.config.get("mqtt", "mqtt_password"),
                "topic": self.config.get("mqtt", "mqtt_topic"),
                "qos": self.config.getint("mqtt", "mqtt_qos", fallback=1),
            }
            self.mqtt_handler = MQTTHandler(self.mqtt_config)
            logging.info("MQTT connection setup complete")
        except Exception as e:
            logging.error(f"Error setting up MQTT: {e}")
            self.use_mqtt = False

    def post_message(self, msg):
        """Post message to configured endpoints with performance tracking."""
        self.metrics.start("message_processing")
        processing_start = time.perf_counter()

        try:
            data = {
                "state": msg.body,
                "attributes": {
                    "time_received": msg.timestamp,
                    "receivers": msg.receivers,
                    "capcodes": msg.capcodes,
                    "raw_message": msg.message_raw,
                },
            }

            json_str = json.dumps(data)
            logging.debug(f"Prepared message data: {json_str}")

            # Post to Home Assistant
            if self.use_hass:
                self.metrics.start("ha_post")
                try:
                    response = self.http_session.post(
                        f"{self.baseurl}/api/states/sensor.p2000",
                        data=json_str,
                        timeout=5,
                    )
                    response.raise_for_status()
                except Exception as e:
                    logging.error(f"HA Error: {e}")
                finally:
                    self.metrics.stop("ha_post")

            # Post to MQTT
            if self.use_mqtt and self.mqtt_handler:
                try:
                    topic = f"{self.mqtt_config['topic']}/sensor/p2000"
                    self.mqtt_handler.publish(
                        topic,
                        json_str,
                        qos=self.mqtt_config["qos"],
                        start_time=msg.creation_time,
                    )
                except Exception as e:
                    logging.error(f"MQTT Error: {e}")

            # Record end-to-end processing time
            self.metrics.record_processing_time(
                time.perf_counter() - processing_start
            )

        except Exception as e:
            logging.error(f"Error in post_message: {e}")
        finally:
            self.metrics.stop("message_processing")

    def process_messages(self):
        """Process messages from queue."""
        logging.info("Message processing thread started")
        while self.thread_manager.running.is_set():
            try:
                msg = self.messages.get(block=True, timeout=1.0)
                if not msg.is_posted:
                    self.metrics.record_latency(msg.creation_time)
                    self.post_message(msg)
                    msg.is_posted = True
            except queue.Empty:
                continue
            except Exception as e:
                logging.error(f"Error processing message: {e}")
                time.sleep(1)

    def read_rtl_sdr(self):
        """Read and process RTL-SDR data."""
        logging.info("Starting RTL-SDR process...")
        process = None
        try:
            process = subprocess.Popen(
                self.rtlfm_cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                shell=True,
                bufsize=1,
                universal_newlines=True,
            )

            logging.info("RTL-SDR process started")
            print("Listening for P2000 messages...")

            while self.thread_manager.running.is_set():
                if process.poll() is not None:
                    raise Exception("RTL-SDR process terminated unexpectedly")

                line = process.stdout.readline()
                if not line:
                    stderr_line = process.stderr.readline()
                    if stderr_line:
                        logging.error(f"RTL-SDR Error: {stderr_line.strip()}")
                    continue

                if "FLEX" in line and "ALN" in line:
                    self.metrics.start("message_parse")
                    try:
                        match = FLEX_PATTERN.search(line)
                        if match:
                            self.handle_message(line, match)
                    finally:
                        self.metrics.stop("message_parse")

        except Exception as e:
            logging.error(f"Error in read_rtl_sdr: {e}")
            print(f"Error: {str(e)}")
        finally:
            if process:
                process.kill()
                process.wait()

    def handle_message(self, line, match):
        """Process and queue a new message."""
        self.metrics.start("message_parse")
        try:
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            capcodes = match.group(4).strip().split()
            message = match.group(5).strip()

            if message and capcodes:
                msg = MessageItem(
                    message_raw=line.strip(),
                    timestamp=timestamp,
                    body=message,
                    capcodes=capcodes,
                )
                self.messages.put(msg, block=False)
                print(f"[{timestamp}] New message: {message}")
        except Exception as e:
            logging.error(f"Error handling message: {e}")
        finally:
            self.metrics.stop(
                "message_parse"
            )  # Ensure metric is always stopped

    def print_performance_stats(self):
        """Print detailed performance statistics."""
        if not self.benchmark:
            return

        stats = {
            "receiver": self.metrics.get_stats(),
            "queue": self.messages.metrics.get_stats(),
            "mqtt": (
                self.mqtt_handler.metrics.get_stats()
                if self.mqtt_handler
                else {}
            ),
            "threads": self.thread_manager.performance_metrics.get_stats(),
        }

        logging.info("\nPerformance Statistics:")
        for component, metrics in stats.items():
            if metrics:
                logging.info(f"\n{component.upper()} METRICS:")
                for op, values in metrics.items():
                    logging.info(f"\n{op}:")
                    # This should be indented to be inside the op loop
                    for metric, value in values.items():
                        if isinstance(value, float):
                            logging.info(f"  {metric}: {value:.2f}ms")
                        else:
                            logging.info(f"  {metric}: {value}")

    def run(self):
        """Start the receiver with performance monitoring."""
        logging.info(f"P2000 Receiver v{VERSION} starting...")
        if self.benchmark:
            logging.info("Performance monitoring enabled")
            self.metrics.start("total_runtime")

        try:
            # Start message processing thread
            self.thread_manager.start_thread("processor", self.process_messages)

            # Start RTL-SDR reading in separate thread
            self.thread_manager.start_thread("rtl-sdr", self.read_rtl_sdr)

            # Wait for shutdown signal
            try:
                while self.thread_manager.running.is_set():
                    time.sleep(1)
            except KeyboardInterrupt:
                logging.info("Shutdown signal received")

        finally:
            # Cleanup and stop all threads
            self.thread_manager.stop_all(timeout=5)
            if self.mqtt_handler:
                self.mqtt_handler.stop()
            if self.benchmark:
                self.metrics.stop("total_runtime")
                self.print_performance_stats()
            logging.info("Receiver stopped")


def main():
    """Main entry point with error handling."""
    try:
        # Set up signal handlers
        def signal_handler(signum, frame):
            logging.info(f"Received signal {signum}")
            if receiver:
                receiver.thread_manager.running.clear()

        import signal

        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)

        # Create and run receiver
        receiver = P2000Receiver()
        receiver.run()

    except KeyboardInterrupt:
        print("\nShutting down...")
    except Exception as e:
        print(f"Fatal error: {str(e)}")
        logging.exception("Fatal error occurred")
        sys.exit(1)


if __name__ == "__main__":
    main()

@spiralshapeturtle
Copy link
Author

This is a stable version of the new code, where I have the feeling it holds up during the new year eave load of the P2000 network

#!/usr/bin/env python3
import requests.adapters
import configparser
import json
import logging
import os
import re
import subprocess
import sys
import threading
import time
import queue
from collections import deque
from datetime import datetime
from threading import Event

import paho.mqtt.client as mqtt
import requests

VERSION = "0.7.1"
CFGFILE = "config.ini"

# Simple pattern matching pipe-separated fields
FLEX_PATTERN = re.compile(
    r"^FLEX\|([^|]*)\|([^|]*)\|([^|]*)\|([^|]*)\|ALN\|([^|]*)"
)

class ThreadManager:
    """Manages multiple worker threads and their lifecycle."""

    def __init__(self):
        self.threads = {}
        self.running = threading.Event()
        self.running.set()

    def start_thread(self, name: str, target, daemon: bool = True) -> bool:
        """Start a new named thread."""
        if name in self.threads:
            logging.warning(f"Thread {name} already exists")
            return False

        thread = threading.Thread(
            target=self._thread_wrapper, args=(name, target), daemon=daemon
        )
        self.threads[name] = thread
        thread.start()
        logging.info(f"Started thread: {name}")
        return True

    def _thread_wrapper(self, name: str, target) -> None:
        """Wrapper to handle thread exceptions and cleanup."""
        try:
            logging.info(f"Thread {name} starting")
            target()
        except Exception as e:
            logging.error(f"Error in thread {name}: {str(e)}")
            logging.exception("Thread error details:")
        finally:
            logging.info(f"Thread {name} stopped")
            if name in self.threads:
                del self.threads[name]

    def stop_all(self, timeout: float = 5) -> None:
        """Stop all threads gracefully."""
        self.running.clear()
        start_time = time.time()

        # Wait for threads to finish
        active_threads = list(self.threads.values())
        for thread in active_threads:
            remaining_time = max(0, timeout - (time.time() - start_time))
            if remaining_time > 0 and thread.is_alive():
                thread.join(timeout=remaining_time)

        # Check for any threads that didn't stop
        still_running = [t.name for t in active_threads if t.is_alive()]
        if still_running:
            logging.warning(f"Threads still running: {still_running}")

        self.threads.clear()

class MessageQueue:
    """Thread-safe message queue with priority support and optimized startup handling."""

    def __init__(self, maxlen: int = 100):
        self.queue = deque(maxlen=maxlen)
        self.lock = threading.Lock()
        self.not_empty = threading.Condition(self.lock)
        self.not_full = threading.Condition(self.lock)
        self.startup_mode = True
        self.startup_timeout = 0.05
        self._startup_time = time.time()

    def put(self, item, block: bool = True, timeout: float = None) -> bool:
        """Add an item to the queue with optimized startup handling."""
        with self.lock:
            if block:
                while len(self.queue) >= self.queue.maxlen:
                    if not self.not_full.wait(timeout):
                        raise queue.Full
            elif len(self.queue) >= self.queue.maxlen:
                raise queue.Full

            self.queue.appendleft(item)
            self.not_empty.notify()
            return True

    def get(self, block: bool = True, timeout: float = None):
        """Remove and return an item from the queue with startup optimization."""
        with self.lock:
            if block:
                current_timeout = self.startup_timeout if self.startup_mode else timeout
                while not self.queue:
                    if not self.not_empty.wait(current_timeout):
                        if self.startup_mode and time.time() - self._startup_time > 30:
                            self.startup_mode = False
                        raise queue.Empty
            elif not self.queue:
                raise queue.Empty

            item = self.queue.pop()
            self.not_full.notify()
            return item

    def exit_startup_mode(self):
        """Explicitly exit startup mode."""
        with self.lock:
            self.startup_mode = False

    def __len__(self) -> int:
        """Return the current size of the queue."""
        with self.lock:
            return len(self.queue)

class MQTTHandler:
    """Handles MQTT connection and message delivery with improved reliability and logging."""

    def __init__(self, config):
        self.config = config
        self.client = None
        self.connected = False
        self.reconnect_delay = 1
        self.max_reconnect_delay = 60
        self.connection_attempts = 0
        self.last_connection_time = None
        self.pending_messages = deque(maxlen=1000)
        self.setup_client()

    def setup_client(self):
        """Initialize MQTT client with all necessary callbacks."""
        try:
            self.client = mqtt.Client(client_id=f"p2000_receiver_{int(time.time())}")

            if self.config["user"] and self.config["password"]:
                self.client.username_pw_set(
                    self.config["user"], self.config["password"]
                )
                logging.info(f"MQTT authentication configured for user: {self.config['user']}")
            else:
                logging.warning("MQTT running without authentication")

            self.client.on_connect = self.on_connect
            self.client.on_disconnect = self.on_disconnect
            self.client.on_publish = self.on_publish
            self.client.on_socket_open = self.on_socket_open
            self.client.on_socket_close = self.on_socket_close
            self.client.on_socket_error = self.on_socket_error

            self.client.max_queued_messages_set(100)
            self.client.max_inflight_messages_set(20)
            self.client.reconnect_delay_set(min_delay=1, max_delay=60)

            logging.info(f"Attempting MQTT connection to {self.config['server']}:{self.config['port']}")
            self.client.connect(
                self.config["server"],
                self.config["port"],
                keepalive=60
            )
            self.client.loop_start()
            logging.info("MQTT client setup completed successfully")

        except ConnectionRefusedError:
            logging.error(f"MQTT connection refused by {self.config['server']}:{self.config['port']}")
            self.connected = False
        except TimeoutError:
            logging.error(f"MQTT connection timeout to {self.config['server']}:{self.config['port']}")
            self.connected = False
        except Exception as e:
            logging.error(f"MQTT connection failed: {str(e)}")
            logging.exception("Full MQTT connection error details:")
            self.connected = False

    def on_socket_open(self, client, userdata, sock):
        """Log when socket is opened."""
        logging.info(f"MQTT socket opened to {self.config['server']}:{self.config['port']}")

    def on_socket_close(self, client, userdata, sock):
        """Log when socket is closed."""
        logging.info("MQTT socket closed")

    def on_socket_error(self, client, userdata, exc):
        """Handle socket errors."""
        logging.error(f"MQTT socket error occurred: {str(exc)}")
        logging.exception("Full socket error details:")

    def on_connect(self, client, userdata, flags, rc):
        """Handle connection events and retry sending pending messages."""
        self.connection_attempts += 1
        self.last_connection_time = time.time()

        rc_codes = {
            0: "Connection successful",
            1: "Connection refused - incorrect protocol version",
            2: "Connection refused - invalid client identifier",
            3: "Connection refused - server unavailable",
            4: "Connection refused - bad username or password",
            5: "Connection refused - not authorized"
        }

        if rc == 0:
            self.connected = True
            self.reconnect_delay = 1
            pending_count = len(self.pending_messages)
            logging.info(f"Connected to MQTT broker after {self.connection_attempts} attempts")
            if pending_count > 0:
                logging.info(f"Attempting to send {pending_count} pending messages")

            while self.pending_messages and self.connected:
                topic, payload, qos = self.pending_messages.popleft()
                self.publish(topic, payload, qos)
        else:
            self.connected = False
            error_msg = rc_codes.get(rc, f"Unknown error code: {rc}")
            logging.error(f"MQTT connection failed: {error_msg}")
            if rc == 4:
                logging.error(f"Authentication failed for user: {self.config['user']}")

    def on_disconnect(self, client, userdata, rc):
        """Handle disconnection events with exponential backoff."""
        self.connected = False
        disconnect_time = time.time()
        session_duration = (
            round(disconnect_time - self.last_connection_time)
            if self.last_connection_time
            else 0
        )

        if rc != 0:
            logging.error(
                f"MQTT disconnected unexpectedly with code {rc} "
                f"after {session_duration} seconds"
            )
            self.reconnect_delay = min(
                self.reconnect_delay * 2, self.max_reconnect_delay
            )
            logging.info(f"Will attempt reconnection in {self.reconnect_delay} seconds")
        else:
            logging.info(
                f"MQTT disconnected normally after {session_duration} seconds"
            )

    def on_publish(self, client, userdata, mid):
        """Confirm successful message publication."""
        logging.debug(f"Message {mid} published successfully")

    def publish(self, topic, payload, qos=1):
        """Publish message with QoS and enhanced error handling."""
        try:
            if not self.connected:
                self.pending_messages.append((topic, payload, qos))
                pending_count = len(self.pending_messages)
                logging.warning(
                    f"MQTT not connected, message queued (pending: {pending_count})"
                )
                return False

            info = self.client.publish(topic, payload, qos=qos)

            if info.rc != mqtt.MQTT_ERR_SUCCESS:
                self.pending_messages.append((topic, payload, qos))
                logging.error(
                    f"Failed to publish message: {info.rc}, "
                    f"message queued (pending: {len(self.pending_messages)})"
                )
                return False

            if qos > 0:
                logging.debug(
                    f"Message queued for publishing with QoS {qos} "
                    f"(mid: {info.mid})"
                )
            return True

        except Exception as e:
            logging.error(f"MQTT Publish Error: {str(e)}")
            logging.exception("Full publish error details:")
            self.pending_messages.append((topic, payload, qos))
            return False

    def stop(self):
        """Clean shutdown of MQTT connection."""
        if self.client:
            try:
                pending_count = len(self.pending_messages)
                if pending_count > 0:
                    logging.warning(
                        f"Shutting down with {pending_count} pending messages"
                    )
                self.client.loop_stop()
                self.client.disconnect()
                logging.info("MQTT client shutdown complete")
            except Exception as e:
                logging.error(f"Error during MQTT shutdown: {str(e)}")
                logging.exception("Full shutdown error details:")

class MessageItem:
    """Message container."""

    __slots__ = (
        "timestamp",
        "message_raw",
        "body",
        "capcodes",
        "receivers",
        "is_posted"
    )

    def __init__(self, message_raw, timestamp, body, capcodes):
        self.timestamp = timestamp
        self.message_raw = message_raw
        self.body = body
        self.capcodes = capcodes
        self.receivers = " ".join(capcodes)
        self.is_posted = False

def load_config():
    """Load or create config file."""
    config = configparser.ConfigParser()
    if config.read(CFGFILE):
        return config

    config["main"] = {"debug": "true"}
    config["rtl-sdr"] = {
        "cmd": "rtl_fm -f 169.65M -M fm -s 22050 | multimon-ng -a FLEX -t raw -"
    }
    config["home-assistant"] = {
        "enabled": "true",
        "baseurl": "http://homeassistant.local:8123",
        "token": "YOUR_TOKEN_HERE",
    }
    config["mqtt"] = {
        "enabled": "true",
        "mqtt_server": "192.168.1.100",
        "mqtt_port": "1883",
        "mqtt_user": "mqttuser",
        "mqtt_password": "password",
        "mqtt_topic": "p2000",
        "mqtt_qos": "1",
    }

    with open(CFGFILE, "w") as f:
        config.write(f)
        logging.info(f"Created new config file: {CFGFILE}")
    return config

class P2000Receiver:
    def __init__(self):
        print(f"Initializing P2000 Receiver v{VERSION}...")
        self.thread_manager = ThreadManager()
        self.messages = MessageQueue(maxlen=100)
        self.config = load_config()
        self.mqtt_handler = None
        self.device_ready = threading.Event()

        # Configure logging
        self.debug = self.config.getboolean("main", "debug")
        log_level = logging.DEBUG if self.debug else logging.INFO
        logging.basicConfig(
            level=log_level,
            format="%(asctime)s - %(levelname)s - %(message)s",
            datefmt="%Y-%m-%d %H:%M:%S",
        )

        logging.info("Loading configuration...")

        # Load config settings
        self.rtlfm_cmd = self.config.get("rtl-sdr", "cmd")
        logging.info(f"RTL-SDR command: {self.rtlfm_cmd}")

        # Initialize Home Assistant
        self.use_hass = self.config.getboolean("home-assistant", "enabled")
        if self.use_hass:
            self.baseurl = self.config.get("home-assistant", "baseurl")
            self.token = self.config.get("home-assistant", "token")
            self.setup_home_assistant()

        # Initialize MQTT
        self.use_mqtt = self.config.getboolean("mqtt", "enabled")
        if self.use_mqtt:
            self.setup_mqtt()

        logging.info("Initialization complete.")

    def setup_home_assistant(self):
        """Initialize Home Assistant connection."""
        try:
            logging.info("Setting up Home Assistant connection...")
            self.http_session = requests.Session()
            adapter = requests.adapters.HTTPAdapter(
                pool_connections=1,
                pool_maxsize=1,
                max_retries=3,
                pool_block=False,
            )
            self.http_session.mount("http://", adapter)
            self.http_session.mount("https://", adapter)
            self.http_session.headers.update(
                {
                    "Authorization": f"Bearer {self.token}",
                    "content-type": "application/json",
                    "Connection": "keep-alive",
                }
            )
            logging.info("Home Assistant connection setup complete")
        except Exception as e:
            logging.error(f"Error setting up Home Assistant: {e}")
            self.use_hass = False

    def setup_mqtt(self):
        """Initialize MQTT connection."""
        try:
            logging.info("Setting up MQTT connection...")
            self.mqtt_config = {
                "server": self.config.get("mqtt", "mqtt_server"),
                "port": self.config.getint("mqtt", "mqtt_port"),
                "user": self.config.get("mqtt", "mqtt_user"),
                "password": self.config.get("mqtt", "mqtt_password"),
                "topic": self.config.get("mqtt", "mqtt_topic"),
                "qos": self.config.getint("mqtt", "mqtt_qos", fallback=1),
            }
            self.mqtt_handler = MQTTHandler(self.mqtt_config)
            logging.info("MQTT connection setup complete")
        except Exception as e:
            logging.error(f"Error setting up MQTT: {e}")
            self.use_mqtt = False

    def post_message(self, msg):
        """Post message to configured endpoints."""
        try:
            data = {
                "state": msg.body,
                "attributes": {
                    "time_received": msg.timestamp,
                    "receivers": msg.receivers,
                    "capcodes": msg.capcodes,
                    "raw_message": msg.message_raw,
                },
            }

            json_str = json.dumps(data)
            logging.debug(f"Prepared message data: {json_str}")

            # Post to Home Assistant
            if self.use_hass:
                try:
                    response = self.http_session.post(
                        f"{self.baseurl}/api/states/sensor.p2000",
                        data=json_str,
                        timeout=5,
                    )
                    response.raise_for_status()
                except Exception as e:
                    logging.error(f"HA Error: {e}")

            # Post to MQTT
            if self.use_mqtt and self.mqtt_handler:
                try:
                    topic = f"{self.mqtt_config['topic']}/sensor/p2000"
                    self.mqtt_handler.publish(
                        topic,
                        json_str,
                        qos=self.mqtt_config["qos"]
                    )
                except Exception as e:
                    logging.error(f"MQTT Error: {e}")

        except Exception as e:
            logging.error(f"Error in post_message: {e}")

    def process_messages(self):
        """Process messages from queue."""
        logging.info("Message processing thread started")
        startup_time = time.time()

        while self.thread_manager.running.is_set():
            try:
                msg = self.messages.get(block=True, timeout=0.1)
                if not msg.is_posted:
                    self.post_message(msg)
                    msg.is_posted = True

                    # Exit startup mode after first successful message
                    if self.messages.startup_mode:
                        self.messages.exit_startup_mode()

            except queue.Empty:
                # Check if we should exit startup mode
                if self.messages.startup_mode and time.time() - startup_time > 30:
                    self.messages.exit_startup_mode()
                continue
            except Exception as e:
                logging.error(f"Error processing message: {e}")
                time.sleep(1)

    def read_rtl_sdr(self):
        """Read and process RTL-SDR data with automatic restart capability."""
        logging.info("Starting RTL-SDR process...")

        base_delay = 1
        max_delay = 60
        current_delay = base_delay
        consecutive_failures = 0
        max_consecutive_failures = 5

        def is_usb_error(error_text):
            """Check if error indicates USB/hardware issues."""
            usb_errors = [
                "cb transfer status",
                "canceling",
                "device not found",
                "no compatible devices found",
                "USB error",
                "failed to open rtlsdr device",
                "unable to claim usb interface"
            ]
            return any(err in error_text.lower() for err in usb_errors)

        def is_startup_message(error_text):
            """Identify normal startup messages."""
            startup_msgs = [
                'multimon-ng',
                'available demodulators:',
                'found',
                'using device',
                'tuner',
                'tuned to',
                'sampling at',
                'sample rate',
                'buffer size',
                'oversampling',
                'allocating',
                'copyright',
                'output at',
                '(c)',
                'realtek',
                'rtl2838',
                'sn:',
                'by tom sailer',
                'by elias oenal'
            ]
            return any(msg in error_text.lower() for msg in startup_msgs)

        def is_device_ready(error_text):
            """Check if device is ready to receive messages."""
            ready_indicators = [
                'tuned to',
                'sampling at',
                'output at'
            ]
            return any(indicator in error_text.lower() for indicator in ready_indicators)

        while self.thread_manager.running.is_set():
            process = None
            start_time = time.time()
            usb_error_detected = False
            self.device_ready.clear()

            try:
                # Check if rtl_fm binary exists and is executable
                rtl_fm_path = self.rtlfm_cmd.split()[0]
                if not os.path.exists(rtl_fm_path) and rtl_fm_path != 'rtl_fm':
                    raise FileNotFoundError(f"RTL-SDR software not found: {rtl_fm_path}")

                # Start the RTL-SDR process
                process = subprocess.Popen(
                    self.rtlfm_cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    shell=True,
                    bufsize=1,
                    universal_newlines=True,
                )

                logging.info("RTL-SDR process started, waiting for device ready...")

                # Main processing loop
                while self.thread_manager.running.is_set():
                    if process.poll() is not None:
                        raise Exception(f"RTL-SDR process terminated with code {process.poll()}")

                    import select
                    reads = [process.stdout.fileno(), process.stderr.fileno()]
                    timeout = 0.1  # Reduced timeout for faster startup response

                    try:
                        readable, _, _ = select.select(reads, [], [], timeout)
                    except (select.error, ValueError) as e:
                        logging.error(f"Select error (possible USB disconnect): {e}")
                        raise Exception("USB device may have been disconnected")

                    for fd in readable:
                        if fd == process.stdout.fileno():
                            line = process.stdout.readline()
                            if line:
                                if "FLEX" in line and "ALN" in line:
                                    try:
                                        match = FLEX_PATTERN.search(line)
                                        if match:
                                            if not self.device_ready.is_set():
                                                self.device_ready.set()
                                                logging.info("First message received, device is ready")
                                            self.handle_message(line, match)
                                    except Exception as e:
                                        logging.error(f"Error parsing message: {e}")
                        elif fd == process.stderr.fileno():
                            error = process.stderr.readline()
                            if error:
                                error_text = error.strip()
                                if not error_text:
                                    continue

                                # Check for device ready state
                                if not self.device_ready.is_set() and is_device_ready(error_text):
                                    self.device_ready.set()
                                    logging.info("Device ready to receive messages")

                                if is_usb_error(error_text):
                                    logging.error(f"USB Error detected: {error_text}")
                                    usb_error_detected = True
                                    raise Exception("USB device error detected")
                                elif is_startup_message(error_text):
                                    logging.info(f"RTL-SDR: {error_text}")
                                else:
                                    logging.warning(f"RTL-SDR Error: {error_text}")

            except Exception as e:
                logging.error(f"Error in RTL-SDR process: {e}")
                consecutive_failures += 1

                # More aggressive retry for USB errors
                if usb_error_detected:
                    logging.info("USB error detected - attempting quick recovery")
                    current_delay = base_delay  # Reset delay for USB errors
                    time.sleep(2)  # Short delay for USB reconnection
                else:
                    if consecutive_failures >= max_consecutive_failures:
                        logging.warning(
                            f"RTL-SDR failed {consecutive_failures} times in rapid succession. "
                            "There might be a hardware or system issue."
                        )

                    if time.time() - start_time < 10:
                        current_delay = min(current_delay * 2, max_delay)

                    logging.info(f"Waiting {current_delay} seconds before restarting RTL-SDR...")

                    # Sleep but respect shutdown signal
                    sleep_start = time.time()
                    while (time.time() - sleep_start < current_delay and
                           self.thread_manager.running.is_set()):
                        time.sleep(1)

            finally:
                if process:
                    try:
                        process.kill()
                        process.wait(timeout=5)
                    except Exception as e:
                        logging.error(f"Error cleaning up RTL-SDR process: {e}")

                if self.thread_manager.running.is_set():
                    logging.info("RTL-SDR process ended, preparing for restart...")

    def handle_message(self, line, match):
        """Process and queue a new message."""
        try:
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
            capcodes = match.group(4).strip().split()
            message = match.group(5).strip()

            if message and capcodes:
                msg = MessageItem(
                    message_raw=line.strip(),
                    timestamp=timestamp,
                    body=message,
                    capcodes=capcodes,
                )
                self.messages.put(msg, block=False)
                print(f"[{timestamp}] New message: {message}")
        except Exception as e:
            logging.error(f"Error handling message: {e}")

    def run(self):
        """Start the receiver."""
        logging.info(f"P2000 Receiver v{VERSION} starting...")

        try:
            # Start message processing thread
            self.thread_manager.start_thread("processor", self.process_messages)

            # Start RTL-SDR reading in separate thread
            self.thread_manager.start_thread("rtl-sdr", self.read_rtl_sdr)

            # Wait for shutdown signal
            try:
                while self.thread_manager.running.is_set():
                    time.sleep(1)
            except KeyboardInterrupt:
                logging.info("Shutdown signal received")

        finally:
            # Cleanup and stop all threads
            self.thread_manager.stop_all(timeout=5)
            if self.mqtt_handler:
                self.mqtt_handler.stop()
            logging.info("Receiver stopped")

def main():
    """Main entry point with error handling."""
    try:
        # Set up signal handlers
        def signal_handler(signum, frame):
            logging.info(f"Received signal {signum}")
            if receiver:
                receiver.thread_manager.running.clear()

        import signal

        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)

        # Create and run receiver
        receiver = P2000Receiver()
        receiver.run()

    except KeyboardInterrupt:
        print("\nShutting down...")
    except Exception as e:
        print(f"Fatal error: {str(e)}")
        logging.exception("Fatal error occurred")
        sys.exit(1)

if __name__ == "__main__":
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants