forked from openWB/core
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Modbus-Steuerung pytest fix flake8 pytest topic add update reg fix fix improve tester flake8 fix typo fix folder structure start modbus server only in secondary mode automate modbus test * Apply suggestions from code review Co-authored-by: benderl <[email protected]> * fixes * fix pytest --------- Co-authored-by: benderl <[email protected]>
- Loading branch information
Showing
22 changed files
with
429 additions
and
42 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ jobs: | |
run: | | ||
python -m pip install --upgrade pip | ||
pip install flake8 pytest paho-mqtt requests-mock jq pyjwt==2.6.0 bs4 pkce typing_extensions python-dateutil==2.8.2 | ||
pip install umodbus | ||
- name: Flake8 with annotations in packages folder | ||
uses: TrueBrain/[email protected] | ||
with: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
#!/usr/bin/env python | ||
import logging | ||
from socketserver import TCPServer | ||
from collections import defaultdict | ||
import struct | ||
from umodbus import conf | ||
from umodbus.server.tcp import RequestHandler, get_server | ||
from umodbus.utils import log_to_stream | ||
|
||
from helpermodules import timecheck | ||
from helpermodules.hardware_configuration import get_serial_number | ||
from helpermodules.pub import Pub | ||
from helpermodules.subdata import SubData | ||
|
||
|
||
log = logging.getLogger(__name__) | ||
|
||
try: | ||
log_to_stream(level=logging.DEBUG) | ||
data_store = defaultdict(int) | ||
conf.SIGNED_VALUES = True | ||
TCPServer.allow_reuse_address = True | ||
app = get_server(TCPServer, ('0.0.0.0', 1502), RequestHandler) | ||
|
||
serial_number = get_serial_number().replace("snnumber=", "") | ||
except (Exception, OSError): | ||
log.exception("Fehler im Modbus-Server") | ||
|
||
|
||
def _form_int32(value, startreg): | ||
secondreg = startreg + 1 | ||
try: | ||
binary32 = struct.pack('>l', int(value)) | ||
high_byte, low_byte = struct.unpack('>hh', binary32) | ||
data_store[startreg] = high_byte | ||
data_store[secondreg] = low_byte | ||
except Exception: | ||
log.exception("Fehler beim Füllen der Register") | ||
data_store[startreg] = -1 | ||
data_store[secondreg] = -1 | ||
|
||
|
||
def _form_int16(value, startreg): | ||
try: | ||
value = int(value) | ||
if (value > 32767 or value < -32768): | ||
raise Exception("Number to big") | ||
data_store[startreg] = value | ||
except Exception: | ||
log.exception("Fehler beim Füllen der Register") | ||
data_store[startreg] = -1 | ||
|
||
|
||
def _form_str(value: str, startreg): | ||
bytes = value.encode("utf-8") | ||
length = len(bytes) | ||
if length > 20: | ||
raise ValueError("String darf max 20 Zeichen enthalten.") | ||
register_offset = 0 | ||
for i in range(0, length, 2): | ||
try: | ||
if i < length-1: | ||
stream_two_bytes = struct.pack(">bb", bytes[i], bytes[i+1]) | ||
stream_one_word = struct.unpack(">h", stream_two_bytes)[0] | ||
else: | ||
stream_two_bytes = struct.pack(">bb", bytes[i], 0) | ||
stream_one_word = struct.unpack(">h", stream_two_bytes)[0] | ||
data_store[startreg+register_offset] = stream_one_word | ||
except Exception: | ||
data_store[startreg+register_offset] = -1 | ||
finally: | ||
register_offset += 1 | ||
|
||
|
||
def _get_pos(number, n): | ||
return number // 10**n % 10 - 1 | ||
|
||
|
||
try: | ||
@app.route(slave_ids=[1], function_codes=[3, 4], addresses=list(range(0, 32000))) | ||
def read_data_store(slave_id, function_code, address): | ||
"""" Return value of address. """ | ||
if address > 10099: | ||
Pub().pub("openWB/set/internal_chargepoint/global_data", | ||
{"heartbeat": timecheck.create_timestamp_unix(), "parent_ip": None}) | ||
chargepoint = SubData.internal_chargepoint_data[f"cp{_get_pos(address, 2)}"] | ||
askedvalue = int(str(address)[-2:]) | ||
if askedvalue == 00: | ||
_form_int32(chargepoint.get.power, address) | ||
elif askedvalue == 2: | ||
_form_int32(chargepoint.get.imported, address) | ||
elif 4 <= askedvalue <= 6: | ||
_form_int16(chargepoint.get.voltages[askedvalue-4]*100, address) | ||
elif 7 <= askedvalue <= 9: | ||
_form_int16(chargepoint.get.currents[askedvalue-7]*100, address) | ||
elif askedvalue == 14: | ||
_form_int16(chargepoint.get.plug_state, address) | ||
elif askedvalue == 15: | ||
_form_int16(chargepoint.get.charge_state, address) | ||
elif askedvalue == 16: | ||
_form_int16(chargepoint.get.evse_current, address) | ||
elif 30 <= askedvalue <= 32: | ||
_form_int16(chargepoint.get.powers[askedvalue-30], address) | ||
elif askedvalue == 41: | ||
_form_int32(chargepoint.get.exported, address) | ||
elif askedvalue == 43: | ||
_form_int16(1, address) | ||
elif askedvalue == 50: | ||
_form_str(serial_number, address) | ||
elif askedvalue == 60: | ||
_form_str(chargepoint.get.rfid, address) | ||
|
||
return data_store[address] | ||
except Exception: | ||
log.exception("Fehler im Modbus-Server") | ||
|
||
try: | ||
@app.route(slave_ids=[1], function_codes=[6, 16], addresses=list(range(0, 32000))) | ||
def write_data_store(slave_id, function_code, address, value): | ||
"""" Set value for address. """ | ||
if 10170 < address: | ||
cp_topic = f"openWB/set/internal_chargepoint/{_get_pos(address, 2)}/data/" | ||
askedvalue = int(str(address)[-2:]) | ||
if askedvalue == 71: | ||
Pub().pub(f"{cp_topic}set_current", value/100) | ||
elif askedvalue == 80: | ||
Pub().pub(f"{cp_topic}phases_to_use", value) | ||
elif askedvalue == 81: | ||
Pub().pub(f"{cp_topic}trigger_phase_switch", value) | ||
elif askedvalue == 98: | ||
Pub().pub(f"{cp_topic}cp_interruption_duration", value) | ||
elif askedvalue == 99: | ||
Pub().pub("openWB/set/command/modbus_server/todo", {"command": "systemUpdate", "data": {}}) | ||
except Exception: | ||
log.exception("Fehler im Modbus-Server") | ||
|
||
|
||
def start_modbus_server(event_modbus_server): | ||
try: | ||
# Wenn start_modbus_server aus SubData aufegrufen wird, wenn das Topic gesetzt wird, führt das zu einem | ||
# circular Import. | ||
event_modbus_server.wait() | ||
event_modbus_server.clear() | ||
log.debug("Starte Modbus-Server") | ||
app.serve_forever() | ||
finally: | ||
try: | ||
app.shutdown() | ||
app.server_close() | ||
except Exception: | ||
log.exception("Fehler im Modbus-Server") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.