forked from MxHbm/Management-Science
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscenario_reduction.py
225 lines (159 loc) · 8.95 KB
/
scenario_reduction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import numpy as np
from sklearn.cluster import KMeans
import pprint
class Scenario_Analyse:
""" Creates a object containing the base scenarios and some logic to compute a Cluster analysis to get a set of reduced sceanrios """
def __init__(self,demand_supply: list[list[int]], probabilities: list[list[float]], K:int = 9, epsilon:float = 0.0001, N:int = 10000) -> None:
# Initialize with default parameters for scenario analysis
''' Target number Of Scenarios (K>1)'''
self._K = K
''' epsilon ... minimum allowed expected frequency (integer value) '''
self._epsilon = epsilon
self._N = N
# Define scenarios and their probabilities
self._demand_supply = np.array(demand_supply)
self._p = np.array(probabilities)
#Create lists for scenarios and their probabilities
self.__create_base_scenarios_and_p()
#Create dataset W
self.__create_vector_W()
#Compute K-means Clustering and create the set of reduced scenarios
self.__apply_K_means_Clustering()
def __str__(self):
''' Print the base scenarios, their probabilities and the reduced scenarios with their probabilities '''
output = "Base Scenarios:\n"
output += pprint.pformat(self._base_scenarios) + "\n"
output += "Probabilities of Base Scenarios:\n"
output += pprint.pformat(self._p_base_scenarios) + "\n"
output += "Reduced Scenarios:\n"
output += pprint.pformat(self._reduced_scenarios) + "\n"
output += "Probabilities of Reduced Scenarios:\n"
output += pprint.pformat(self._p_reduced_scenarios) + "\n"
return output
def __create_base_scenarios_and_p(self) -> None:
''' Create all possible scenarios dependent from the demand and supply and the probabilities '''
#Create empty lists for scenarios and the belonging probability
self._base_scenarios = []
self._p_base_scenarios = []
# Iterate through all combinations of demand and supply levels.
for i in range(len(self._demand_supply[0])):
for j in range(len(self._demand_supply[1])):
for k in range(len(self._demand_supply[2])):
for l in range(len(self._demand_supply[3])):
for m in range(len(self._demand_supply[4])):
#Creation of scenario as a list
scenario = [self._demand_supply[0][i],self._demand_supply[1][j], self._demand_supply[2][k], self._demand_supply[3][l], self._demand_supply[4][m]]
self._base_scenarios.append(scenario)
#Calculate the probility of this scenario (round to 4 decimals to avoid floats when multiplying with 10000)
prob_scenario = round(self._p [0][i] * self._p [1][j] * self._p [2][k] * self._p [3][l] * self._p [4][m],4)
self._p_base_scenarios.append(prob_scenario)
def __create_vector_W(self) -> None:
''' Creates dataset W which is poulated by scenarios from original _base_scenarios vector, each sceanrio can be
present multiple timnes dependent on the probability!
'''
''' W is a dataset with m fields, corresponding to the m number of components in a scenario vector. '''
self.W = [] # Initialize the dataset W.
# Loop through each scenario in the original _base_scenarios vector.
for i in range(len(self._base_scenarios)):
# Calculate the frequency of the current scenario based on its probability.
f_scenario = round(self._N * self._p_base_scenarios[i])
#print(f'{i}, f_scenario: {self._N} * {self._p_base_scenarios[i]} = {f_scenario}')
# Check if the frequency meets a certain threshold (epsilon).
if(f_scenario >= self._epsilon):
'''With epsilon = 0.0001, epsilon has no influence on the overall process!!!
# Add a comment regarding the impact of epsilon when it equals 0.0001. '''
# Repeat the current scenario according to its frequency.
for j in range(f_scenario):
#Add scenario to dataset
self.W.append(self._base_scenarios[i])
# Convert the dataset W to a NumPy array.
self.W = np.array(self.W)
def __normalize_dataset(self, data:np.array) -> np.array:
"""
Normalize the dataset and return it as a NumPy array.
"""
# Normalize each feature across all vectors
self.mean_data = np.mean(data, axis=0)
self.std_data = np.std(data, axis=0)
# Avoid division by zero
self.std_data[self.std_data == 0] = 1
# Applying the normalization
normalized_data = (data - self.mean_data) / self.std_data
return normalized_data
def __denormalize_dataset(self, data:np.array) -> np.array:
"""
Denormalizes the data by applying the inverse of the earlier normalization process.
Normalization typically transforms data to have a mean of zero and a standard deviation
of one. This function reverses that process by using the stored mean and standard deviation
values to transform the normalized data back to its original scale.
Args:
data (np.array): A numpy array containing normalized data.
Returns:
np.array: A numpy array containing denormalized data, which restores the original scale of values.
"""
# Applying the denormalization formula: original_value = normalized_value * std + mean
denormalized_data = data * self.std_data + self.mean_data
return denormalized_data
def __apply_K_means_Clustering(self) -> None:
"""
Applies K-means clustering to the scenario dataset to identify clusters that summarize the scenarios.
This method performs the following steps:
1. Normalize the dataset `W` containing all scenarios.
2. Apply the K-means clustering algorithm to the normalized data, with the number of clusters (`K`) defined in the class.
3. Use the resulting cluster centers to represent reduced scenarios, which are denormalized to reflect their actual scales.
4. Calculate the sizes of each cluster, which represent how many scenarios fall into each cluster.
5. Compute the probabilities for each reduced scenario based on the cluster sizes relative to the total number of scenarios.
The clustering helps reduce the complexity of scenario analysis by representing multiple similar scenarios with a single 'average' scenario, thus simplifying further analysis or decision-making processes.
"""
# Normalize the dataset W to prepare for clustering.
data = self.__normalize_dataset(self.W)
# Create a K-means clustering model with specified number of clusters and a fixed random state for reproducibility.
kmeans = KMeans(n_clusters=self._K, random_state=0, n_init='auto')
# Fit the K-means model on the normalized data.
kmeans.fit(data)
# Round the denormalized cluster centers to form the reduced scenarios.
self._reduced_scenarios = np.round(self.__denormalize_dataset(np.array(kmeans.cluster_centers_)))
# Calculate the size of each cluster to understand the distribution of scenarios across clusters.
self._sizes_reduced_scenarios = np.bincount(kmeans.labels_)
# Compute probabilities of each cluster by normalizing the cluster sizes.
self._p_reduced_scenarios = [round(cluster_size / len(self.W), 4) for cluster_size in self._sizes_reduced_scenarios]
# PROPERTY FUNCTIONS TO GET VALUES
@property
def len_W(self) -> int:
return len(self.W)
@property
def len_base_scenarios(self) -> int:
return len(self._base_scenarios)
@property
def K(self) -> int:
return self._K
@property
def epsilon(self) -> float:
return self._epsilon
@property
def N(self):
return self._N
@property
def demand_supply(self) -> list[list[int]]:
return self._demand_supply
@property
def probabilities(self) -> list[list[float]]:
return self._p
@property
def base_scenarios(self) -> list[list[int]]:
return self._base_scenarios
@property
def base_scenario_probabilities(self) -> list[list[float]]:
return self._p_base_scenarios
@property
def reduced_scenarios(self) -> list[list[int]]:
return self._reduced_scenarios
@property
def sizes_reduced_scenarios(self) -> list[int]:
return self._sizes_reduced_scenarios
@property
def reduced_scenarios_probabilities(self) -> list[float]:
return self._p_reduced_scenarios
@property
def len_reduced_scenarios(self) -> int:
return len(self._reduced_scenarios)