Haste makes waste

Nano01(自動運転)-U05-Lesson22-Kalman Filters

Posted on By lijun

参考资料:

  1. 视频卡尔曼滤波器的原理以及在matlab中的实现,通俗易懂
  2. 维基百科卡尔曼滤波
  3. 维基百科即时定位与地图构建(SLAM)

0. 小结

这部分虽然是理论课,但还比较好懂,通过这部分学习,完成了卡尔曼滤波的学习,接触高斯分布,使用乘法进行测度更新,使用卷积完成预测和状态转移,并用python实现了一个卡尔曼滤波器。

它被用于车辆追踪任务,用于进行不可观测部分速度的预测

0. 卡尔曼滤波器(最佳线性滤波器)原理

卡尔曼滤波(Kalman filter)是一种高效率的递归滤波器(自回归滤波器),它能够从一系列的不完全及包含噪声的测量中,估计动态系统的状态。卡尔曼滤波会根据各测量量在不同时间下的值,考虑各时间下的联合分布,再产生对未知变数的估计,因此会比只以单一测量量为基础的估计方式要准。卡尔曼滤波得名自主要贡献者之一的鲁道夫·卡尔曼。

卡尔曼滤波在技术领域有许多的应用。常见的有飞机及太空船的导引、导航及控制。

卡尔曼滤波的算法是二步骤的程序。在估计步骤中,卡尔曼滤波会产生有关目前状态的估计,其中也包括不确定性。只要观察到下一个量测(其中一定含有某种程度的误差,包括随机噪声)。会透过加权平均来更新估计值,而确定性越高的量测加权比重也越高。算法是迭代的,可以在实时控制系统中执行,只需要目前的输入量测、以往的计算值以及其不确定性矩阵,不需要其他以往的信息。

下面是卡尔曼滤波涉及的5个公式:

image

  1. 左边两个公式,即预测,是通过上一时刻的状态,来预测当前时刻的状态,这个并不是最佳状态的估计值。(分别为预测状态预测估计协方差矩阵)
  2. 右边三个公式,即更新,通过当前的观测值,来更新预测值X和P,经过更新后的值就是最佳观测值,所以右边的是不带减号上标的。(分别为卡尔曼系数,更新的状态估计,更新的协方差估计)

下面是一个卡尔曼滤波的模型:

  1. 圆圈代表向量
  2. 方块代表矩阵
  3. 星号代表高斯噪声

image

  • Q: 预测模型带来的噪声
  • R: 观察模型带来的噪声
  • B: 控制矩阵表示控制量u如何作用于当前状态
  • u: 控制向量,加速度
  • z: 观察向量
  • R和Q: 分别表示观测噪声,和 预测噪声的协方差矩阵
  • x: 表示状态,包含位置和速度的向量
  • H: 观测矩阵[1,0]

1. 状态转移

image

如上图,是卡尔曼滤波的第一个公式: 状态预测公式。通过状态预测公式,就可以推测当前时刻的状态。

  1. 状态Xt,由Pt和Vt构成的向量,表示在t时刻的位置和速度。
  2. 当前的位置和速度,可以通过前一秒的位置/速度,以及时间的增量(1秒)和加速度,一起计算得到。
  3. Pt(位置)以及Vt(速度),与变量加速度是线性关系,所以可以写成矩阵的形式。(参考①)
  4. 通过②简化后,得到公式1即状态预测公式。

上面所有的推测,都是包含噪声的,噪声越大,不确定性就越大,那么如何表示这次推测带来了多大的不确定性呢?这时需要用下面的协方差矩阵来表示。

2. 协方差矩阵

一维数据方差:

先看如果数据是一维的时候,数据的离散状态可以用方差来衡量,即各个值据中心值之间差值的平方和。(下面的小圆点表示包含噪声的数据,每次的值都不同)

一维可以用方差来表示其分布状况:

image

二维数据协方差:

协方差(Covariance)在概率论和统计学中用于衡量两个变量的总体误差。而方差是协方差的一种特殊情況,即当两个变量是相同的情況。

image

为了表示两个维度之间的相关性,除了记录两个维度的方差以外,还要协方差来表示两个维度的相关程度。在卡尔曼滤波中,所有关于不确定性的表述,都用到了协方差矩阵。

image

3. 噪声协方差矩阵的传递

每一个时刻的不确定性,都是用协方差矩阵P来表示,具体参考如下:

image

这个就是公式2,预测估计协方差矩阵。

4. 观察矩阵

如下图中,有一个激光测距仪,通过激光测距仪,在每个时刻都可以观测到车的位置,观测到的值记为Zt,下面的图表示观测值与状态值之间的公式:

image

  1. 上面的H是一个观测向量,用于将二维的x状态向量,转换成与z一样的标量(1*p + 0*v得到位置p,z也是位置信息)。
  2. 如果除了激光测距仪之外,还有其他的测距方式,那么z就不是一个标量了,而是一个多维的列向量,有多个包含各种测量方式的测量值(位置信息),而每个测量值都只是真实世界的不完整表现。
  3. 我们可以从几种不完整的表现中,推断出真实的状态,卡尔曼滤波器的数据融合功能,就是在测量矩阵中体现出来。

5. 状态更新

下面是状态更新用的公式3公式4,将推断的观测值,加上实际观测值与预测观测值之间的残差,得到修正后的x状态值。

image

观测值Z是一个一维的向量,状态X是一个二维向量,它们所使用的单位甚至是特征,都有可能是不同的。 那如何使用观测值的残差,去更新状态值呢?那卡尔曼系数K,就是做这样一个转换。

上面的例子中,只有观测值Z,但是在K(卡尔曼系数中),已经包含了协方差矩阵P的信息,所以用它位置和速度这两者的相关性,从位置的残差,推断出了速度的残差。从而可以让我们对x的两个维度(位置和速度),同时进行修正。

6. 噪声协方差矩阵的更新

更新最佳估计值的噪声分布,这个值是留给下一轮迭代时使用的,在这一步中,状态的不确定性是减小的,而在下一轮迭代中,由于传递噪声的引入,其不确定性又会增大。

下面是公式5

image

上面就是卡尔曼滤波5个公式的详细信息。

3. Gaussian Intro

正态分布(英语:normal distribution)又名高斯分布(英语:Gaussian distribution),是一个非常常见的连续概率分布。正态分布在统计学上十分重要,经常用在自然和社会科学来代表一个不明的随机变量。

若随机变量 X服从一个位置参数为 μ、尺度参数为σ的正态分布,记为

X ~ N(μ,σ**2)

则其概率密度函数为:

image

下面只正态分布的概率密度函数,红色表示标准正态分布:

  • X轴表示x值大小,Y轴表示该x值出现的概率
  • 比如深蓝色的线,其x值出现在[-1,1]的概率最大,说明值的分布集中在这里,故方差较小。
  • 反之,黄色的线,x值的出现范围在[-5,5]之间,说明值的分布比较分散,故方差较大。

image

卡尔曼滤波中,我们的任务,是保持一个对未知物体位置最佳估计的(μ,σ**2),即均值和方差。

image

上面的ABC,都是高斯函数分布,满足下面的特点:

  1. 只有一个波峰
  2. 波峰左边单调上升,右边单调下降

4. Variance Comparison

image

方差是不确定性的一种度量,σ**2越大,我们对实际状态的不确定性越高,方差越小,分布有更高的确定性。

所以上面的case中,中间的一种,即比较窄的情况,是一种比较好的分布。

6. Evaluate Gaussian

image

python程序计算如下:

import math

def fx(u,sigma,x):
    x1 = 1/(math.sqrt(2*(sigma**2)*math.pi))   
    exp1 = ((-0.5)*(x-u)*(x-u))/(sigma**2)
    exp2 = math.exp(exp1)
    
    return x1*exp2
u = 10
sigma = 2
x = 8

print(fx(u,sigma,x))

输出0.12098

7. Maximize Gaussian

将x设定为与mu一样大,可以得到最大的输出值,0.19947

#For this problem, you aren't writing any code.
#Instead, please just change the last argument 
#in f() to maximize the output.

from math import *

def f1(mu, sigma2, x):
    return 1/sqrt(2.*pi*sigma2) * exp(-.5*(x-mu)**2 / sigma2)

print(f1(10.,4.,10.))

9. Shifting the Mean

卡尔曼滤波中,我们重复测量和运动两个过程,这个过程叫做测量更新。

image

quit:

The new belief will be somewhere between the previous belief and the new measurement.

黑色的扁平状分布,是基于先前的经验推测出来的;蓝色的长窄状分布,是测量值;显然测量值的分布方差更小,更可信,故新的修正值应该更靠近测量值一些。

image

10. Predicting the Peak

修正后的值,因为其不确定度,比之前的先验分布测量值分布都高,故方差更小,所以是下面红色的图形分布:

The new belief will be more certain than either the previous belief OR the measurement.

image

11. Parameter Updata

Notice that:

  1. the new mean is between the previous two means
  2. the new variance is LESS than either of the previous variances.

image

Quiz:

image

14. Separated Gaussians 2

image

先验分布和测量值分布,分别是左右两个分布图:

  1. 修正后分布的均值,在左右分布的中间,即均值
  2. 修改后分布的方差,比左右分布的有更高的确信度,所以方差更小,根据公式计算,方差为原来的一半,故分布图显得更窄。

15. New Mean and Variance

根据上面的均值和方差修正公式,写对应的python代码:

from math import *

def update(mean1, var1, mean2, var2):
    new_mean = (var1*mean2 + var2*mean1)/(var1 + var2)
    new_var = (var1*var2)/(var1+var2)
    return [new_mean, new_var]

print(update(10.,8.,13., 2.))

16. Gaussian Motion

image

如上图中,先前的分布,加上移动的分布,得到新的分布。

用代码实现如下:

def update(mean1, var1, mean2, var2):
    new_mean = (var2 * mean1 + var1 * mean2) / (var1 + var2)
    new_var = 1/(1/var1 + 1/var2)
    return [new_mean, new_var]

def predict(mean1, var1, mean2, var2):
    new_mean = mean1 + mean2
    new_var = var1 + var2
    return [new_mean, new_var]

print predict(10., 4., 12., 4.)

18. Kalman Filter Code

下面的代码,部署了一个完整的一维卡尔曼滤波器,在for循环中:

  1. 用测量值来更新初始值,初始值的方差很大,说明不可信任,故第一次更新时,将均值更新为了4.98(接近5),其方差也更新为了3.98(接近4)
  2. 用上面更新后的值,以及每次运动即motion的值,一起去预测,得到下一轮的预测值,包含均值和方差
# Write a program that will iteratively update and
# predict based on the location measurements 
# and inferred motions shown below. 

def update(mean1, var1, mean2, var2):
    new_mean = float(var2 * mean1 + var1 * mean2) / (var1 + var2)
    new_var = 1./(1./var1 + 1./var2)
    return [new_mean, new_var]

def predict(mean1, var1, mean2, var2):
    new_mean = mean1 + mean2
    new_var = var1 + var2
    return [new_mean, new_var]

measurements = [5., 6., 7., 9., 10.] # 测量数据
motion = [1., 1., 2., 1., 1.] # 运动数据i
measurement_sig = 4.  # 测量不确定性,即方差
motion_sig = 2. # 运动不确定性,即方差
mu = 0. # 初始的均值
sig = 1000. # 初始的不确定性,即方差

for i in range(len(measurements)):
	# 用测量值来更新初始值,初始值的方差很大,说明不可信任
	# 故第一次更新时,将均值更新为了4.98(接近5),其方差也更新为了3.98(接近4)
    [mu,sig] = update(mu,sig,measurements[i],measurement_sig)
    print("update: ",[mu,sig])
    
    # 用上面更新后的值,以及每次运动即motion的值,一起去预测,得到下一轮的预测值,包含均值和方差
    [mu,sig] = predict(mu,sig,motion[i],motion_sig)
    print("predict: ",[mu,sig])

输出:

update:  [4.9800796812749, 3.9840637450199203]
predict:  [5.9800796812749, 5.98406374501992]
...
update:  [9.99906346214631, 2.0058299481392163]
predict:  [10.99906346214631, 4.005829948139216]

上面的初始值方差为1000,说明不可信,所以第一次更新后,值变为了[4.9800796812749, 3.9840637450199203]。 如果将方差修改为0.0001,说明初始值可靠,那么第一次更新后,值变为了如下:

update:  [0.00012499687507812305, 9.999750006249843e-05]
predict:  [1.000124996875078, 2.0000999975000626]
...
update:  [9.532187064943109, 1.988304969006662]
predict:  [10.532187064943109, 3.988304969006662]

24. Kalman Filter Design

下面就是卡尔曼滤波的公式:

image

25. Kalman Matrices

下面的代码实现了上面的卡尔曼滤波公式:

# Write a function 'kalman_filter' that implements a multi-
# dimensional Kalman Filter for the example given

from math import *


class matrix:
    
    # implements basic operations of a matrix class
    
    def __init__(self, value):
        self.value = value
        self.dimx = len(value)
        self.dimy = len(value[0])
        if value == [[]]:
            self.dimx = 0
    
    def zero(self, dimx, dimy):
        # check if valid dimensions
        if dimx < 1 or dimy < 1:
            raise(ValueError, "Invalid size of matrix")
        else:
            self.dimx = dimx
            self.dimy = dimy
            self.value = [[0 for row in range(dimy)] for col in range(dimx)]
    
    def identity(self, dim):
        # check if valid dimension
        if dim < 1:
            raise(ValueError, "Invalid size of matrix")
        else:
            self.dimx = dim
            self.dimy = dim
            self.value = [[0 for row in range(dim)] for col in range(dim)]
            for i in range(dim):
                self.value[i][i] = 1
    
    def show(self):
        for i in range(self.dimx):
            print(self.value[i])
        print(' ')
    
    def __add__(self, other):
        # check if correct dimensions
        if self.dimx != other.dimx or self.dimy != other.dimy:
            raise(ValueError, "Matrices must be of equal dimensions to add")
        else:
            # add if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, self.dimy)
            for i in range(self.dimx):
                for j in range(self.dimy):
                    res.value[i][j] = self.value[i][j] + other.value[i][j]
            return res
    
    def __sub__(self, other):
        # check if correct dimensions
        if self.dimx != other.dimx or self.dimy != other.dimy:
            raise(ValueError, "Matrices must be of equal dimensions to subtract")
        else:
            # subtract if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, self.dimy)
            for i in range(self.dimx):
                for j in range(self.dimy):
                    res.value[i][j] = self.value[i][j] - other.value[i][j]
            return res
    
    def __mul__(self, other):
        # check if correct dimensions
        if self.dimy != other.dimx:
            raise(ValueError, "Matrices must be m*n and n*p to multiply")
        else:
            # multiply if correct dimensions
            res = matrix([[]])
            res.zero(self.dimx, other.dimy)
            for i in range(self.dimx):
                for j in range(other.dimy):
                    for k in range(self.dimy):
                        res.value[i][j] += self.value[i][k] * other.value[k][j]
            return res
    
    def transpose(self):
        # compute transpose
        res = matrix([[]])
        res.zero(self.dimy, self.dimx)
        for i in range(self.dimx):
            for j in range(self.dimy):
                res.value[j][i] = self.value[i][j]
        return res
    
    # Thanks to Ernesto P. Adorio for use of Cholesky and CholeskyInverse functions
    
    def Cholesky(self, ztol=1.0e-5):
        # Computes the upper triangular Cholesky factorization of
        # a positive definite matrix.
        res = matrix([[]])
        res.zero(self.dimx, self.dimx)
        
        for i in range(self.dimx):
            S = sum([(res.value[k][i])**2 for k in range(i)])
            d = self.value[i][i] - S
            if abs(d) < ztol:
                res.value[i][i] = 0.0
            else:
                if d < 0.0:
                    raise(ValueError, "Matrix not positive-definite")
                res.value[i][i] = sqrt(d)
            for j in range(i+1, self.dimx):
                S = sum([res.value[k][i] * res.value[k][j] for k in range(self.dimx)])
                if abs(S) < ztol:
                    S = 0.0
                try:
                   res.value[i][j] = (self.value[i][j] - S)/res.value[i][i]
                except:
                   raise(ValueError, "Zero diagonal")
        return res
    
    def CholeskyInverse(self):
        # Computes inverse of matrix given its Cholesky upper Triangular
        # decomposition of matrix.
        res = matrix([[]])
        res.zero(self.dimx, self.dimx)
        
        # Backward step for inverse.
        for j in reversed(range(self.dimx)):
            tjj = self.value[j][j]
            S = sum([self.value[j][k]*res.value[j][k] for k in range(j+1, self.dimx)])
            res.value[j][j] = 1.0/tjj**2 - S/tjj
            for i in reversed(range(j)):
                res.value[j][i] = res.value[i][j] = -sum([self.value[i][k]*res.value[k][j] for k in range(i+1, self.dimx)])/self.value[i][i]
        return res
    
    def inverse(self):
        aux = self.Cholesky()
        res = aux.CholeskyInverse()
        return res
    
    def __repr__(self):
        return repr(self.value)


########################################

# Implement the filter function below

def kalman_filter(x, P):
    print(x)
    for n in range(len(measurements)):
        # measurement update
        Z = matrix([[measurements[n]]])
        y = Z - (H * x)
        S = H * P * H.transpose()  + R
        K = P * H.transpose() * S.inverse()
        P = (I - (K*H)) * P
        
        x = x + (K * y)                     
                              
        # prediction
        P = F * P * F.transpose()
        x = (F*x) + u              
    return x,P

############################################
### use the code below to test your filter!
############################################

measurements = [1, 2, 3]

x = matrix([[0.], [0.]]) # initial state (location and velocity)
P = matrix([[1000., 0.], [0., 1000.]]) # initial uncertainty
u = matrix([[0.], [0.]]) # external motion
F = matrix([[1., 1.], [0, 1.]]) # next state function
H = matrix([[1., 0.]]) # measurement function
R = matrix([[1.]]) # measurement uncertainty
I = matrix([[1., 0.], [0., 1.]]) # identity matrix

print(kalman_filter(x, P)[0])
print("-----")
print(kalman_filter(x, P)[1])
# output should be:
# x: [[3.9996664447958645], [0.9999998335552873]]
# P: [[2.3318904241194827, 0.9991676099921091], [0.9991676099921067, 0.49950058263974184]]

输出为:

[[0.0], [0.0]]
[[3.9996664447958645], [0.9999998335552873]]
-----
[[0.0], [0.0]]
[[2.3318904241194827, 0.9991676099921091], [0.9991676099921067, 0.49950058263974184]]

卡尔曼滤波是打造无人驾驶车的必备知识,通过它可以有效地实现寻找其他车辆的方法。