-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFed2.py
84 lines (64 loc) · 3.18 KB
/
Fed2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import helics as h
import logging
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
# Good practice to set up logging so we know what is goin on.
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG) # Check the logs and config.json
# Function to destroy this federate once we've finished the co-simulation.
def destroy_federate(fed):
h.helicsFederateDestroy(fed)
logger.info("Federate finalized")
if __name__ == "__main__":
############## Creating Federate and Pub/Sub messaging ##################
fed = h.helicsCreateValueFederateFromConfig("Fed2Config.json")
sub_count = h.helicsFederateGetInputCount(fed)
subid = {}
for i in range(0, sub_count):
subid[i] = h.helicsFederateGetInputByIndex(fed, i)
pub_count = h.helicsFederateGetPublicationCount(fed)
pubid = {}
for i in range(0, pub_count):
pubid[i] = h.helicsFederateGetPublicationByIndex(fed, i)
############## Entering Execution Mode ##################################
h.helicsFederateEnterExecutingMode(fed)
logger.debug("Entered HELICS execution mode")
total_seconds = 24
time_interval_seconds = 2
granted_time = 0
results = {"sin": [], "cos": []}
########## Main co-simulation loop ########################################
logger.debug(f"Entering main co-simulation loop at time {granted_time} seconds")
while granted_time < total_seconds:
# Get the values from the subscriptions (Or inputs from other federates)
sin = h.helicsInputGetDouble(subid[0])
cos = h.helicsInputGetDouble(subid[1])
logger.debug(f"Sin value: {sin}, Cos value: {cos}")
results["sin"].append(sin)
results["cos"].append(cos)
# Put your logic / model / calculations here
amplified_sin_value = sin * 2
amplified_cos_value = cos * 2
# Publish the values to the publications (Or outputs to other federates)
h.helicsPublicationPublishDouble(pubid[0], amplified_sin_value)
h.helicsPublicationPublishDouble(pubid[1], amplified_cos_value)
# Time request for the next physical interval to be simulated
requested_time_seconds = granted_time + time_interval_seconds
granted_time = h.helicsFederateRequestTime(fed, requested_time_seconds)
logger.debug(
f"Granted time {granted_time} seconds while requested time {requested_time_seconds} seconds with time interval {time_interval_seconds} seconds"
)
# Cleaning up HELICS stuff once we've finished the co-simulation.
destroy_federate(fed)
# Plotting the results
plt.figure()
plt.title("Unamplified Sin and Cos from Fed1, plot generated by Fed2")
plt.plot(results["sin"], label="Unamplified Sin", color="red")
plt.plot(results["cos"], label="Unamplified Cos", color="blue")
plt.xlabel("Time (seconds)")
plt.legend()
ax = plt.gca()
ax.xaxis.set_minor_locator(ticker.MultipleLocator(1)) # Set minor tick frequency
plt.grid(True, which="both", axis="x", color="gray", alpha=0.2) # Reduced opacity
plt.show()