This repository has been archived by the owner on Feb 9, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
mongo.py
94 lines (82 loc) · 3.06 KB
/
mongo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from typing import List
from datetime import datetime
import paho.mqtt.client as mqtt
import pymongo
import pymongo.database
import pymongo.collection
import pymongo.errors
import threading
import os
MONGO_URI = "mongodb://127.0.0.1:27017" # mongodb://user:pass@ip:port || mongodb://ip:port
MONGO_DB = "domotics"
MONGO_COLLECTION = "mqtt"
MONGO_TIMEOUT = 1 # Time in seconds
MONGO_DATETIME_FORMAT = "%d/%m/%Y %H:%M:%S"
MONGO_URI = os.getenv("MONGO_URI", MONGO_URI)
MONGO_DB = os.getenv("MONGO_DB", MONGO_DB)
MONGO_COLLECTION = os.getenv("MONGO_COLLECTION", MONGO_COLLECTION)
MONGO_TIMEOUT = float(os.getenv("MONGO_TIMEOUT", MONGO_TIMEOUT))
MONGO_DATETIME_FORMAT = os.getenv("MONGO_DATETIME_FORMAT", MONGO_DATETIME_FORMAT)
class Mongo(object):
def __init__(self):
self.client: pymongo.MongoClient = None
self.database: pymongo.database.Database = None
self.collection: pymongo.collection.Collection = None
self.queue: List[mqtt.MQTTMessage] = list()
def connect(self):
print("Connecting Mongo")
self.client = pymongo.MongoClient(MONGO_URI, serverSelectionTimeoutMS=MONGO_TIMEOUT*1000.0)
self.database = self.client.get_database(MONGO_DB)
self.collection = self.database.get_collection(MONGO_COLLECTION)
def disconnect(self):
print("Disconnecting Mongo")
if self.client:
self.client.close()
self.client = None
def connected(self) -> bool:
if not self.client:
return False
try:
self.client.admin.command("ismaster")
except pymongo.errors.PyMongoError:
return False
else:
return True
def _enqueue(self, msg: mqtt.MQTTMessage):
print("Enqueuing")
self.queue.append(msg)
# TODO process queue
def __store_thread_f(self, msg: mqtt.MQTTMessage):
print("Storing")
now = datetime.now()
try:
document = {
"topic": msg.topic,
"payload": msg.payload.decode(),
# "retained": msg.retain,
"qos": msg.qos,
"timestamp": int(now.timestamp()),
"datetime": now.strftime(MONGO_DATETIME_FORMAT),
# TODO datetime must be fetched right when the message is received
# It will be wrong when a queued message is stored
}
result = self.collection.insert_one(document)
print("Saved in Mongo document ID", result.inserted_id)
if not result.acknowledged:
# Enqueue message if it was not saved properly
self._enqueue(msg)
except Exception as ex:
print(ex)
def _store(self, msg):
th = threading.Thread(target=self.__store_thread_f, args=(msg,))
th.daemon = True
th.start()
def save(self, msg: mqtt.MQTTMessage):
print("Saving")
if msg.retain:
print("Skipping retained message")
return
if self.connected():
self._store(msg)
else:
self._enqueue(msg)