Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support set_attributes_all (service 0x02) #116

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions server/enip/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def parse_context( sender_context ):
#

def int_validate( x, lo, hi ):
res = int( x )
res = int( x, base = 0)
assert lo <= res <= hi, "Invalid %d; not in range (%d,%d)" % ( res, lo, hi)
return res

Expand Down Expand Up @@ -857,6 +857,32 @@ def get_attributes_all( self, path,
sender_context=sender_context, **kwds )
return req

def set_attributes_all( self, path, data, elements=1, tag_type=None,
route_path=None, send_path=None, timeout=None, send=True,
sender_context=b'', **kwds):

if elements is None:
elements = len( data )
else:
assert elements == len( data ), \
"Inconsistent elements: %d doesn't match data length: %d" % ( elements, len( data ))

req = dotdict()
req.path = { 'segment': [
dotdict( d ) for d in device.parse_path( path )
]}
req.set_attributes_all= {
'data': data,
'elements': elements,
}

if send:
self.req_send(
request=req, route_path=route_path, send_path=send_path, timeout=timeout,
sender_context=sender_context, **kwds )

return req

def get_attribute_single( self, path,
route_path=None, send_path=None, timeout=None, send=True,
data_size=None, elements=None, tag_type=None, # for response data_size estimation
Expand Down Expand Up @@ -1373,6 +1399,11 @@ def issue( self, operations, index=0, fragment=False, multiple=0, timeout=None )
tag_type=op.get( 'tag_type' ) or parser.DINT.tag_type, size=op.get( 'elements', 1 ))
else:
rpyest = multiple
elif method == "set_attributes_all":
descr += "S_A_A"
req = self.set_attributes_all( timeout=timeout, send=not multiple, **op )
reqest = 8 + parser.typed_data.datasize( parser.USINT.tag_type, size=len( op['data'] ))
rpyest = 4
elif method == "service_code":
req = self.service_code( timeout=timeout, send=not multiple, **op )
reqest = 1 + len( req.input ) # We've rendered the Service Request payload
Expand Down Expand Up @@ -1504,7 +1535,7 @@ def collect( self, timeout=None ):
elif reply.status in (0x00,0x06) and 'get_attributes_all' in reply:
val = reply.get_attributes_all.data
elif reply.status in (0x00,):
# eg. 'set_attribute_single', 'write_{tag,frag}', 'service_code', etc...
# eg. 'set_attribute_single', 'set_attributes_all', 'write_{tag,frag}', 'service_code', etc...
val = True
else: # Failure; val is Falsey
if 'status_ext' in reply and reply.status_ext.size:
Expand Down
75 changes: 75 additions & 0 deletions server/enip/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,11 @@ def register_service_parser( cls, number, name, short, machine ):
GA_ALL_REQ = 0x01
GA_ALL_RPY = GA_ALL_REQ | 0x80

SA_ALL_NAM = "Set Attributes All"
SA_ALL_CTX = "set_attributes_all"
SA_ALL_REQ = 0x02
SA_ALL_RPY = SA_ALL_REQ | 0x80

GA_LST_NAM = "Get Attribute List"
GA_LST_CTX = "get_attribute_list"
GA_LST_REQ = 0x03
Expand Down Expand Up @@ -1062,6 +1067,9 @@ def request( self, data, addr=None ):
elif ( data.get( 'service' ) == self.GA_ALL_REQ
or self.GA_ALL_CTX in data and data.setdefault( 'service', self.GA_ALL_REQ ) == self.GA_ALL_REQ ):
pass
elif ( data.get( 'service' ) == self.SA_ALL_REQ
or self.SA_ALL_CTX in data and data.setdefault( 'service', self.SA_ALL_REQ ) == self.SA_ALL_REQ ):
pass
elif ( data.get( 'service' ) == self.SA_SNG_REQ
or self.SA_SNG_CTX in data and data.setdefault( 'service', self.SA_SNG_REQ ) == self.SA_SNG_REQ ):
pass
Expand All @@ -1086,6 +1094,38 @@ def request( self, data, addr=None ):
data.get_attributes_all = dotdict()
data.get_attributes_all.data = [
b if type( b ) is int else ord( b ) for b in result ]
elif data.service == self.SA_ALL_RPY:
# Set Attributes All. Convert unsigned ints to bytes, parse appropriate
# elements using the Attribute's .parser, and assign. Must produce exactly the
# correct number of bytes to fully populate the Attributes.

a_id = 1
sum_siz = 0
while str(a_id) in self.attribute:

att = self.attribute[str(a_id)]
att_siz = att.parser.struct_calcsize * len(att)
sum_siz += att_siz
a_id += 1

assert 'set_attributes_all.data' in data and len(data.set_attributes_all.data) == sum_siz, \
"Expected %d total bytes in .set_attributes_all.data" % (sum_siz,)

a_id = 1
data_ptr = 0
while str(a_id) in self.attribute:

att = self.attribute[str(a_id)]
siz = att.parser.struct_calcsize
att_siz = siz * len(att)
fmt = att.parser.struct_format
buf = bytearray( data.set_attributes_all.data[data_ptr:data_ptr + att_siz] )
val = [struct.unpack( fmt, buf[i:i+siz] )[0]
for i in range(0, len(buf), siz) ]
att[:] = val
a_id += 1
data_ptr += att_siz

elif data.service == self.GA_LST_RPY:
# Get Attribute List. Collect up the bytes representing the attributes. Converts a
# placehold .get_attribute_list = [<attribute>,...] list of attribute numbers with
Expand Down Expand Up @@ -1217,6 +1257,12 @@ def produce( cls, data ):
# Get Attributes All
result += USINT.produce( data.service )
result += EPATH.produce( data.path )
elif cls.SA_ALL_CTX in data and data.setdefault( 'service', cls.SA_ALL_REQ ) == cls.SA_ALL_REQ:
# Set Attributes All
result += USINT.produce( data.service )
result += EPATH.produce( data.path )
result += typed_data.produce( data.set_attributes_all,
tag_type=USINT.tag_type )
elif cls.GA_SNG_CTX in data and data.setdefault( 'service', cls.GA_SNG_REQ ) == cls.GA_SNG_REQ:
# Get Attribute Single
result += USINT.produce( data.service )
Expand All @@ -1242,6 +1288,12 @@ def produce( cls, data ):
if data.status == 0x00:
result += typed_data.produce( data.get_attributes_all,
tag_type=USINT.tag_type )
elif data.get( 'service' ) == cls.SA_ALL_RPY:
# Set Attributes All Reply.
result += USINT.produce( data.service )
result += b'\x00' # reserved
result += status.produce( data )

elif data.get( 'service' ) == cls.GA_LST_RPY:
# Get Attribute List Reply
result += USINT.produce( data.service )
Expand Down Expand Up @@ -1328,6 +1380,29 @@ def __get_attributes_all_reply():
Object.register_service_parser( number=Object.GA_ALL_RPY, name=Object.GA_ALL_NAM + " Reply",
short=Object.GA_ALL_CTX, machine=__get_attributes_all_reply() )

def __set_attributes_all( ):
srvc = USINT( context='service' )
srvc[True] = path = EPATH( context='path')
path[True] = typed_data( context=Object.SA_ALL_CTX,
tag_type=USINT.tag_type,
terminal=True )
return srvc

Object.register_service_parser( number=Object.SA_ALL_REQ, name=Object.SA_ALL_NAM,
short=Object.SA_ALL_CTX, machine=__set_attributes_all() )

def __set_attributes_all_reply():
srvc = USINT( context='service' )
srvc[True] = rsvd = octets_drop( 'reserved', repeat=1 )
rsvd[True] = stts = status()
stts[None] = mark = octets_noop( context=Object.SA_ALL_CTX,
terminal=True )
mark.initial[None] = move_if( 'mark', initializer=True )
return srvc

Object.register_service_parser( number=Object.SA_ALL_RPY, name=Object.SA_ALL_NAM + " Reply",
short=Object.SA_ALL_CTX, machine=__set_attributes_all_reply() )

def __get_attribute_list():
srvc = USINT( context='service' )
srvc[True] = path = EPATH( context='path')
Expand Down
3 changes: 1 addition & 2 deletions server/enip/get_attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ def attribute_operations( paths, int_type=None, **kwds ):
for op in client.parse_operations( paths, int_type=int_type or 'SINT', **kwds ):
path_end = op['path'][-1]
if 'instance' in path_end:
op['method'] = 'get_attributes_all'
assert 'data' not in op, "All Attributes cannot be operated on using Set Attribute services"
op['method'] = 'set_attributes_all' if 'data' in op else 'get_attributes_all'
elif 'symbolic' in path_end or 'attribute' in path_end or 'element' in path_end:
op['method'] = 'set_attribute_single' if 'data' in op else 'get_attribute_single'
else:
Expand Down