-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathverify_markov_chain_model.py
146 lines (114 loc) · 4.6 KB
/
verify_markov_chain_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from freshpondsim import FreshPondSim
from utils import get_random_velocities_and_distances_func
import scipy.stats
from scipy.special import gamma
import matplotlib.pyplot as plt
from tqdm import tqdm
import numpy as np
from numpy import linalg as la
t0 = 0
t_end = 60*8
entrance_rate_constant = 1000
###### Constant Entry Rate
# def entrance_rate(t):
# if t < t0:
# return 0
# return entrance_rate_constant
# def entrance_rate_integral(t):
# if t < t0:
# return 0
# return entrance_rate_constant * (t - t0)
# def entrance_rate_integral_inverse(y):
# assert y >= 0
# return y / entrance_rate_constant + t0
###### Sinusoidal Varying Entry Rate
a = 0.9 * entrance_rate_constant
period = 80
freq = 1/period
omega = 2*np.pi * freq
def entrance_rate(t):
if t < t0:
return 0
return entrance_rate_constant + a * np.cos(omega * t)
def entrance_rate_integral(t):
if t < t0:
return 0
return entrance_rate_constant * t + a / omega * np.sin(omega * t)
entrance_rate_integral_inverse = None
########### Set up time distribution
scale = 42.59286560661815 # scale parameter of Weibull distribution
k = 1.5513080437971483 # shape parameter of weibull distribution
mean_duration = scale * gamma(1 + 1/k)
duration_dist = scipy.stats.weibull_min(k, scale=scale)
rand_veloc_dist_func = get_random_velocities_and_distances_func(
duration_dist.rvs, 1)
sim = FreshPondSim(distance=2.5,
start_time=t0,
end_time=t_end,
entrances=[0],
entrance_weights=[1],
rand_velocities_and_distances_func=rand_veloc_dist_func,
entrance_rate=entrance_rate,
entrance_rate_integral=entrance_rate_integral,
entrance_rate_integral_inverse=entrance_rate_integral_inverse,
interpolate_rate=False,
interpolate_rate_integral=False,
snap_exit=False)
t = 60*8-10
n_reps = 1
times = []
times.extend(t - p.start_time for p in sim.get_pedestrians_at_time(t))
for _ in tqdm(range(n_reps - 1)):
sim.refresh_pedestrians()
times.extend(t - p.start_time for p in sim.get_pedestrians_at_time(t))
times = np.array(times)
def get_truncated_above_sf(cdf, maxval):
cdf_maxval = cdf(maxval)
return lambda t: 1 - cdf(np.minimum(t, maxval)) / cdf_maxval
def get_truncated_above_pdf(cdf, pdf, maxval):
cdf_maxval = cdf(maxval)
return np.vectorize(lambda t: pdf(t) / cdf_maxval if t <= maxval else 0)
dt = 0.5 # time interval in minutes
# truncate the distribution to lie below max_time
max_time = duration_dist.ppf(0.999)
n_intervals = int(np.ceil(max_time / dt))
max_time = n_intervals * dt
# survival function of truncated duration distribution
trunc_sf = get_truncated_above_sf(duration_dist.cdf, max_time)
# create transition matrix for markov chain.
# column i of the matrix is the probabilities of going to each state if
# currently in state i.
# state 0 is for being outside the res, the other states are for being at
# different levels of progress through being in the res.
# state 1 is the first state you are at when you enter the res.
P = np.zeros((n_intervals + 1, n_intervals + 1))
P[1, 0] = 1
P[0, 0] = 0
t_samples = np.linspace(0, max_time, num=n_intervals + 1)
# k goes thru all k's except 0 and n_intervals
for k in range(1, n_intervals):
stay_prob = trunc_sf(t_samples[k]) / trunc_sf(t_samples[k-1])
P[k + 1, k] = stay_prob
P[0, k] = 1 - stay_prob
P[0, n_intervals] = 1
print(f'Finding eigenvalues and eigenvectors of the {n_intervals+1}x{n_intervals+1} transition matrix...')
vals, vecs = la.eig(P)
print('Done finding those eigens! :)')
eig1_index = np.argmin(abs(vals - 1))
assert np.isclose(vals[eig1_index].real, 1, rtol=0, atol=1e-10)
assert vals[eig1_index].imag == 0
assert np.all(vecs[:, eig1_index].imag == 0)
v = vecs[:, eig1_index].real
v /= v.sum()
assert np.allclose(P @ v, v)
t_probabilities = v[1:] / (1 - v[0])
t_densities = t_probabilities / dt
t_midpoints = t_samples[1:] - dt/2
trunc_pdf = get_truncated_above_pdf(duration_dist.cdf, duration_dist.pdf, max_time)
plt.plot(t_midpoints, t_densities, label='duration so far density by markov chain eigenvector approximation')
# plt.plot(t_midpoints, trunc_pdf(t_midpoints), label='truncated duration dist pdf')
plt.plot(t_midpoints, duration_dist.pdf(t_midpoints), label='duration dist pdf')
plt.hist(times, bins='auto', density=True, label='histogram of duration so far')
plt.plot(t_midpoints, duration_dist.sf(t_midpoints)/mean_duration, label='duration so far density exact')
plt.legend()
plt.show()