forked from alfredkrohmer/profinet
-
Notifications
You must be signed in to change notification settings - Fork 1
/
dcp.py
144 lines (103 loc) · 4.88 KB
/
dcp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import argparse, time
from util import *
from protocol import *
params = {
"name": PNDCPBlock.NAME_OF_STATION,
"ip": PNDCPBlock.IP_ADDRESS
}
class DCPDeviceDescription:
def __init__(self, mac, blocks):
self.mac = mac2s(mac)
self.name = blocks[PNDCPBlock.NAME_OF_STATION].decode()
self.ip = s2ip(blocks[PNDCPBlock.IP_ADDRESS][0:4])
self.netmask = s2ip(blocks[PNDCPBlock.IP_ADDRESS][4:8])
self.gateway = s2ip(blocks[PNDCPBlock.IP_ADDRESS][8:12])
self.vendorHigh, self.vendorLow, self.devHigh, self.devLow = unpack(">BBBB", blocks[PNDCPBlock.DEVICE_ID][0:4])
def get_param(s, src, target, param):
dst = s2mac(target)
if param not in params.keys():
return
param = params[param]
block = PNDCPBlockRequest(param[0], param[1], 0, bytes())
dcp = PNDCPHeader(0xfefd, PNDCPHeader.GET, PNDCPHeader.REQUEST, 0x012345, 192, 2, block)
eth = EthernetVLANHeader(dst, src, 0x8100, 0, PNDCPHeader.ETHER_TYPE, payload=ensure_min_packet_size(dcp))
s.send(bytes(eth))
return list(read_response(s, src, once=True).values())[0][param]
def set_param(s, src, target, param, value):
dst = s2mac(target)
if param == 'ip':
# ToDo: prase from value
value = s2ip_bytes('192.168.201.51') + s2ip_bytes('255.255.255.0') + s2ip_bytes('0.0.0.0')
else:
value = bytes(value, encoding='ascii')
if param not in params.keys():
return
param = params[param]
block = PNDCPBlockRequest(param[0], param[1], len(value) + 2, bytes([0x00, 0x00]) + value)
dcp = PNDCPHeader(0xfefd, PNDCPHeader.SET, PNDCPHeader.REQUEST, 0x012345, 0, len(value) + 6 + (1 if len(value) % 2 == 1 else 0), block)
eth = EthernetVLANHeader(dst, src, 0x8100, 0, PNDCPHeader.ETHER_TYPE, payload=ensure_min_packet_size(dcp))
s.send(bytes(eth))
# ignore response
s.recv(1522)
time.sleep(2)
def send_discover(s, src):
block = PNDCPBlockRequest(0xFF, 0xFF, 0, bytes())
dcp = PNDCPHeader(0xfefe, PNDCPHeader.IDENTIFY, PNDCPHeader.REQUEST, 0x012345, 192, len(block), payload=block)
eth = EthernetVLANHeader(s2mac("01:0e:cf:00:00:00"), src, 0x8100, 0, PNDCPHeader.ETHER_TYPE, payload=ensure_min_packet_size(dcp))
s.send(bytes(eth))
def send_request(s, src, t, value):
block = PNDCPBlockRequest(t[0], t[1], len(value), bytes(value))
dcp = PNDCPHeader(0xfefe, PNDCPHeader.IDENTIFY, PNDCPHeader.REQUEST, 0x012345, 192, len(block), block)
eth = EthernetVLANHeader(s2mac("01:0e:cf:00:00:00"), src, 0x8100, 0, PNDCPHeader.ETHER_TYPE, payload=ensure_min_packet_size(dcp))
s.send(bytes(eth))
def read_response(s, my_mac, to=20, once=False, debug=False):
ret = {}
found = []
s.settimeout(2)
try:
with max_timeout(to) as t:
while True:
if t.timed_out:
break
try:
data = s.recv(1522)
except timeout:
continue
# nur Ethernet Pakete an uns und vom Ethertype Profinet
eth = EthernetHeader(data)
if eth.dst != my_mac or eth.type != PNDCPHeader.ETHER_TYPE:
continue
debug and print("MAC address:", mac2s(eth.src))
# nur DCP Identify Responses
pro = PNDCPHeader(eth.payload)
if not (pro.service_type == PNDCPHeader.RESPONSE):
continue
# Blöcke der Response parsen
blocks = pro.payload
length = pro.length
parsed = {}
while length > 6:
block = PNDCPBlock(blocks)
blockoption = (block.option, block.suboption)
parsed[blockoption] = block.payload
block_len = block.length
if blockoption == PNDCPBlock.NAME_OF_STATION:
debug and print("Name of Station: %s" % block.payload)
parsed["name"] = block.payload
elif blockoption == PNDCPBlock.IP_ADDRESS:
debug and print(str(block.parse_ip()))
parsed["ip"] = s2ip(block.payload[0:4])
elif blockoption == PNDCPBlock.DEVICE_ID:
parsed["devId"] = block.payload
# Padding:
if block_len % 2 == 1:
block_len += 1
# geparsten Block entfernen
blocks = blocks[block_len+4:]
length -= 4 + block_len
ret[eth.src] = parsed
if once:
break
except TimeoutError:
pass
return ret