아래 매트랩 예시보면 mu = 2, sigma = 1일때 x = 2인 위치의 확률이 가장 높은 0.4정도가 나온다.
즉 실제 z와 다양한 k 중 pt로부터 얻은 hx = z가 가장 가까울 수록 높은 값을 갖는다.
적다보니 말이 이상한데 가장 실제에 가까운 파티클의 가중치를 높이는 혁활
일단 z 자리가 x 값
mu 자리는 파티클 기준 관측 예측치
분산은 10
그러면 파이썬에선 이를 대체할 함수가 뭐가 있나..
for k = 1:Npt
wt(k) = wt(k)*normpdf(z, hx(pt(:,k)), 10);
end
wt = wt / sum(wt);
대충 찾아보니 사이파이에 있는 걸 쓰면 될듯하다.
위에서 본 매트랩 예제랑 동일하게 만들어봤는데
값이 비슷한거 봐서 맞는것같네
이제 추정 상태 구현 차례
파티클은 3x 1000
가중치는 1 x 1000
가중합을 구하려면 P 행렬곱 W.T 하면
X에 알맞게 3 x 1 행렬이 나온다.
이제 거의다 구현하긴 했는데
가장 어려운 리샘플링 로직이 남았다.
def pf(z, dt):
global particles, W
for k in range(N_particle):
particles[:, k] = fx(particles[:, k], dt) + np.random.normal(0, 1, (3))
for k in range(N_particle):
W[0, k] = W[0, k] * stats.norm.pdf(z, hx(particles[:, k].reshape(-1, 1)), 10)
W = W / sum(W[0,:])
X_est = particles @ W.T
SIR 로직 이거 처음봤을때 뭔소린가 이해하기 정말 어려웠는데
지금은 대충은 이해된다.
가중치는 지금 1 x 1000 형태 행렬인데
정규화를 해서 전체를 합하면 1이된다.
이를 누적확률 분포 구간별로 구분하면 이런식이 된다.
이 구간 전체에다가 균일 분포를 따르는 점들을 놓자
그러면 좁은 구간에는 점들이 적을 것이고
넓은 구간에는 점들이 많다.
점이 많은 구간은 중요한 파티클
점이 작은 구간은 안중요한 파티클로 보고
중요도가 높은 파티클들 중심으로 리샘플링을 수행한다.
로직은 대충 알지만 이걸 어케 파이썬으로 구현하냐
일단 매트랩 함수를 찾아가면서 비슷한거 찾아쓰거나 만들어야지.
일단 cumsum은 누적 확률 분포 같은걸 만드는데
shape는 그대로 유지된다.
근대 보통 이런거 넘파이에 대부분 구현되어있다.
W 누적 분포 구하기위해 일단 임시로 관측치, 직선거리 z = 1101로 놓고 한번 가중치 누적 분포 출력하면 이런식
균일 분포랑 누적분포를 만들긴 했는데 매트랩 코드 그대로 구현하기가 좀 어려운데
아 파이썬 데이터 타입떄문에 되게 골치아프다.
파티클이 1000개다 보니까 소숫점 매우 낮은데까지 가는데
저 로직은 이해 안되고
그냥 위에서 중요 가중치 인덱스 찾는 로직을 억지로 루프 돌려서 만들긴 했다.
인덱스별로 균일 분포 포인트가 몇개 있는지를 구했는데
다음 문제는 파티클을 어떻게 새로 생성하는가다
0이 아닌 인덱스들을 모아서
그 인덱스에 해당하는 파티클을 + 정규분포해서 균일분포 속한 갯수만큼 생성시키자
10번 파티클이 가장 가중치 많이 받았는데
이 가중치 구간에 균일분포 점이 100개가 들었다.
그러면 10번 파티클을 베이스로 정규분포 노이즈 추가해서 100개 만드는것으로
하는식으로 구현해야겟다.
ㅈ버ㅑㅐㅍㅂ주ㅑ잽풉재ㅑㅍㅂ주ㅑㅐㄼ우ㅐㅑ재ㅏㅂㅈ타ㅐㅔㅈ바ㅐㅔㅂ
어거지로 만들긴했는데
제대로 동작할지 모르겠다.
def pf(z, dt):
global particles, W
for k in range(N_particle):
particles[:, k] = fx(particles[:, k], dt) + np.random.normal(0, 1, (3))
for k in range(N_particle):
W[0, k] = W[0, k] * stats.norm.pdf(z, hx(particles[:, k].reshape(-1, 1)), 10)
W = W / sum(W[0,:])
X_est = particles @ W.T
W_cum = np.cumsum(W, dtype=np.float16)
uniform_pts = np.random.uniform(0, 1, (N_particle, 1))
uniform_pts.sort(axis=0)
W_cum_dedup = sorted(list(set(W_cum)))
num_dict = {}
for i, val in enumerate(W_cum_dedup):
if (i - 1) < 0:
low_bound = 0
else:
low_bound = W_cum_dedup[i - 1]
upper_bound = W_cum_dedup[i]
num_dict[i] = sum(uniform_pts[uniform_pts >= low_bound] < upper_bound)
particles_tmp = np.zeros((3, N_particle), dtype=np.float16)
cum_idx = 0
for key, val in num_dict.items():
if val == 0:
continue
for idx in range(val):
particles_tmp[:, cum_idx + idx] = particles[:, key] + 0.1 * particles[:, key] * np.random.normal(0, 1, (3))
cum_idx += val
#print(key, cum_idx, val)
# cum idx가 1000이 안된다면(파티클 1000개 다 안채우면) 기존 파티클 재사용
if cum_idx < 1000:
particles_tmp[:, cum_idx:] = particles[:, cum_idx:]
particles = particles_tmp
W = np.ones((1, N_particle), dtype=np.float16) * 1 / N_particle
return X_est
ㅠㅠ 파티클 필터 만들기는 실패
리샘플링이 문젠가
아니면 다른 중간 로직이 문제였을까
일단 구현한 코드는 아래 첨부
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from math import sqrt
def fx(X_est, dt):
A = np.array([
[0, 1, 0],
[0, 0, 0],
[0, 0, 0]
])
A = np.eye(3) + dt * A
X_pred = A @ X_est
return X_pred
def hx(X_pred):
x1 = X_pred[0][0]
x3 = X_pred[2][0]
Z_pred = np.sqrt(x1**2 + x3**2)
return Z_pred
def get_radar():
global pos
vel = 100 + 5 * np.random.normal()
alt = 1000 + 10 * np.random.normal()
pos = pos + vel * dt
v = 0 + pos * 0.05 * np.random.normal()
r = sqrt(pos ** 2 + alt ** 2) + v
return r
dt = 0.05
X_est = np.array([[0, 90, 1100]]).T
N_particle = 1000
pos = 0
particles = np.random.normal(0, 1, (3, N_particle))
particles[0, :] = X_est[0][0] + 0.1 * X_est[0][0] * particles[0, :]
particles[1, :] = X_est[1][0] + 0.1 * X_est[1][0] * particles[1, :]
particles[2, :] = X_est[2][0] + 0.1 * X_est[2][0] * particles[2, :]
#print(particles.shape)
W = np.ones((1, N_particle), dtype=np.float16) * 1 / N_particle
t_list = np.arange(0, 20, 0.05)
pos_list, vel_list, alt_list, z_list, dist_list = [], [], [], [], []
z = 1101
def pf(z, dt):
global particles, W
for k in range(N_particle):
particles[:, k] = fx(particles[:, k], dt) + np.random.normal(0, 1, (3))
for k in range(N_particle):
W[0, k] = W[0, k] * stats.norm.pdf(z, hx(particles[:, k].reshape(-1, 1)), 10)
W = W / sum(W[0,:])
X_est = particles @ W.T
W_cum = np.cumsum(W, dtype=np.float16)
uniform_pts = np.random.uniform(0, 1, (N_particle, 1))
uniform_pts.sort(axis=0)
W_cum_dedup = sorted(list(set(W_cum)))
num_dict = {}
for i, val in enumerate(W_cum_dedup):
if (i - 1) < 0:
low_bound = 0
else:
low_bound = W_cum_dedup[i - 1]
upper_bound = W_cum_dedup[i]
num_dict[i] = sum(uniform_pts[uniform_pts >= low_bound] < upper_bound)
particles_tmp = np.zeros((3, N_particle), dtype=np.float16)
cum_idx = 0
for key, val in num_dict.items():
if val == 0:
continue
for idx in range(val):
particles_tmp[:, cum_idx + idx] = particles[:, key] + 0.1 * particles[:, key] * np.random.normal(0, 1, (3))
cum_idx += val
#print(key, cum_idx, val)
# cum idx가 1000이 안된다면(파티클 1000개 다 안채우면) 기존 파티클 재사용
if cum_idx < 1000:
particles_tmp[:, cum_idx:] = particles[:, cum_idx:]
particles = particles_tmp
W = np.ones((1, N_particle), dtype=np.float16) * 1 / N_particle
return X_est
for _ in range(len(t_list)):
z = get_radar()
X_est = pf(z, dt)
pos_list.append(X_est[0][0])
vel_list.append(X_est[1][0])
alt_list.append(X_est[2][0])
z_list.append(z)
dist_list.append(sqrt(X_est[0][0] ** 2 + X_est[2][0] ** 2))
plt.subplot(411)
ax1 = plt.subplot(4,1,1)
ax2 = plt.subplot(4,1,2)
ax3 = plt.subplot(4,1,3)
ax4 = plt.subplot(4,1,4)
ax1.plot(t_list, pos_list, label="pos")
ax2.plot(t_list, vel_list, label="vel")
ax3.plot(t_list, alt_list, label="alt")
ax4.plot(t_list, z_list, label="measured")
ax4.plot(t_list, dist_list, label="estimated")
ax1.legend()
ax2.legend()
ax3.legend()
ax4.legend()
plt.gcf().set_size_inches(12, 8)
import scipy.io
accel = scipy.io.loadmat("ArsAccel.mat")
def get_accel():
ax = accel["fx"]
ay = accel["fy"]
az = accel["fz"]
accel_len = len(ax)
for i in range(accel_len):
yield ax[i], ay[i], az[i]
for ax, ay, az in get_accel():
print(ax, ay, az)
get_accel()
import numpy as np
pos_true = 0
vel_true = 80
dt = 0.1
def get_pos():
global pos_true
global vel_true
w = 0 + 10 * np.random.normal() # vel noise
v = 0 + 10 * np.random.normal() # pos noise
z = pos_true + vel_true * dt + v
pos_true = z - v
vel_true = 80 + w
return z
for _ in range(20):
print(pos_true, "-", get_pos())
def get_vel():
global pos_true
global vel_true
w = 0 + 10 * np.random.normal() # vel noise
vel_true = 80 + w
pos_true = pos_true + vel_true * dt
z = vel_true
return z
칼만필터 로직은 변경할거 없고
X에서 위치를 뽑아내려고
H = [1, 0] 로 했던걸
속도를 뽑아낼수 있도록
H = [0, 1]로 고치가만 하면 끝
import matplotlib.pyplot as plt
def get_vel():
global pos_true
global vel_true
w = 0 + 10 * np.random.normal() # vel noise
vel_true = 80 + w
pos_true = pos_true + vel_true * dt
z = vel_true
return z
pos_true = 0
vel_true = 80
dt = 0.1
time_cur, time_stop = 0, 5
t_list = []
pos_true_list = []
pos_est_list = []
vel_true_list = []
vel_est_list = []
A = np.array([
[1, dt],
[0, 1]
])
H = np.array([
[0, 1]
])
Q = np.array([
[1, 0],
[0, 3]
])
X = np.array([
[0, 20]
]).T
P = 5 * np.eye(2)
R = 10
def int_kf(z):
x_pred = A @ X
P_pred = A @ P @ A.T + Q
K_numerator = P_pred @ H.T
K_denominator = H @ P_pred @ H.T + R
K_denominator = np.linalg.inv(K_denominator)
K = K_numerator @ K_denominator
measurement_err = z - H @ x_pred
x_est = x_pred + K @ measurement_err
P_est = P_pred - K @ H @ P_pred
return x_est, P_est
while time_cur < time_stop:
pos_true_list.append(pos_true)
z = get_vel()
X, P = int_kf(z)
t_list.append(time_cur)
pos_est_list.append(X[0][0])
vel_true_list.append(vel_true)
vel_est_list.append(X[1][0])
time_cur += dt
ax1 = plt.subplot(2, 1, 1)
ax2 = plt.subplot(2, 1, 2)
ax1.plot(t_list, pos_true_list, label="true pos")
ax1.plot(t_list, pos_est_list, label="kf pos")
ax1.legend()
ax2.plot(t_list, vel_true_list, label="true speed")
ax2.plot(t_list, vel_est_list, label="kf speed")
ax2.legend()
로보틱스에서 사용되는 ekf slam에서는 로봇과 랜드마크의 (x, y) 좌표를 확장 칼만 필터로 추정해냄
위 그림에서 시스템 행렬 A, 오차 공분산 행렬 P, 시스템 노이즈를 의미하는 Q, 출력행렬 H, 측정 노이즈 R 가 있는데
보통 칼만 필터에서 다루는 상태 값은 전압, 초음파 거리 같이 하나가 아닌
slam의 예시에선 x_robot, y_robot, theta_robot, x_landmark1, y_landmark2, ....여러 개라 행렬의 형태, 상태 공간으로 표현하게됨
A : 시스템 행렬
Q : 시스템 노이즈
P : 오차 공분산 행렬
H : 측정값 출력 행렬
R : 측정 노이즈
K : 칼만 이득
시스템 행렬 A는 상태 x가 다음 시점에서 어떻게 변하는지 나타내는 행렬이다.
측정 값 z_k = H x_k로
H는 측정 행렬을 나타내며 x_k와 곱했을때 측정값이 튀어나오도록 하는 행렬임
칼만 이득 K_k는 측정값으로 얼마나 예측값을 보정할지 다루는 계수로
측정 노이즈 R가 크다 -> 칼만 이득은 작아짐 -> 측정 노이즈가 크다면 측정 값을 덜 반영함
오차 공분산 예측 P_bar k가 크다 -> 칼만 이득은 커짐 -> 측정 값을 더 반영한다
칼만 필터를 이용한 추정값 hat x_k는
hat x_k = hat x_bar k + K_k ( z_k - H hat x_bar k)로 구해진다.
칼만 이득 K_k에 따라 측정 값을 얼마나 반영시킬지 조절함
(z_k - H hat x_bar k)는 실제 측정값 - 예측 측정갑의 오차를 의미한다.
오차 공분산 행렬 예측값은 앞뒤로 시스템행렬 A를 곱해줘서 구한다.
오차 공분산 행렬은 예측 오차 공분산 행렬 - 칼만 이득 * 관측 행렬 * 예측 오차 공분산 행렬인데 P_bar k로 묶으면 아래와 같다.
오차 공분산은 추정 결과가 얼마나 올바른지 나타냄
결과가 올바르면 오차 공분산 행렬은 작아짐
말이 많이 지저분해졌는데 대충 정리하면
칼만 필터는 필터링, 상태 추정하는 필터
관측 값과 예측 관측 값의 차이(실 상태 값과 예측 상태 추정치 차이) 에 따라 칼만 게인을 조절해서
관측 값을 얼마나 반영시킬지 조정함
쓰고 보니까 원서에서는
prediction, correction이라 해서
이전엔 예측과 갱신이라고 표현했던거같은데
이미 이렇게 쓴거 그냥 나둠
대충 로직 설명은 여기까지가고 구현 들어감
데이터는 지난번에 만든 전압 데이터 사용ㅇ
import numpy as np
def get_volt():
w = 0 + 4 * np.random.normal()
z = 14.4 + w
return z
for _ in range(10):
print(get_volt())
지금 사용하는 상태는 전압 v 하나뿐이라
A H Q R P에서 행렬이라고 할게 없음
측정 노이즈 R 같은 경우에는
갯 볼티지 함수에서 4 * 정규분포(0, 1)을 해가지구
R을 4로 줌
x는 볼티지 기저 값을 14.4로 해서 14로 주고
A, H는 변환 할게 없이 그대로 쓰므로 1
시스템 노이즈 Q는 뭐라할게 없어서 0
추정하는 상태값이 볼트 1개 뿐이라 뭔가 하는것같은 느낌은 안들지만
동작은 잘되긴 한다.
import matplotlib.pyplot as plt
A = 1
H = 1
Q = 0
R = 4
x = 14
P = 6
def kf(x, P, z):
x_pred = A * x
P_pred = A * P * A + Q
K = P_pred * H / (H * P_pred * H + R)
x_est = x_pred + K * (z - H * x_pred)
P_est = P_pred - K * H * P_pred
return x_est, P_est
t_list = np.arange(0, 10, 0.2)
volt_list = []
kf_list = []
x_est = 0
for _ in range(len(t_list)):
volt = get_volt()
if x_est == 0:
x_est = x
P_est = P
x_est, P_est = kf(x_est, P_est, volt)
volt_list.append(volt)
kf_list.append(x_est)
plt.plot(t_list, volt_list, label="measurement")
plt.plot(t_list, kf_list, label="filter estimates")
plt.legend()