算法学习二

这是算法学习的第二篇博客,本文将聚焦于图搜索相关的算法。

所谓的图(Graph)由节点和边构成,节点代表变量,边表示相互关系,通常具有一定的权重。图的搜索算法可以解决一些基本的问题,比如最短路径问题。

广度优先搜索

广度优先搜索的特征从起点开始,由近及远进行广泛地搜索。下面我们定义一个图(如下图),这是个无向图,我们从某个节点出发分别进行广度优先搜索和深度优先搜索。

点击显/隐
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
graph = {
"A": ["B", "C"],
"B": ["A", "C", "D"],
"C": ["A", "B", "D", "E"],
"D": ["B", "C", "E", "F"],
"E": ["C", "D"],
"F": ["D"]
}

# 广度优先搜索
def BFS(graph, s):
queue = []
queue.append(s)
seen = set()
seen.add(s)
# parent = {s: None}
while(len(queue) > 0):
vertex = queue.pop(0)
nodes = graph[vertex]
for w in nodes:
if w not in seen:
queue.append(w)
seen.add(w)
# parent[w] = vertex
print(vertex)
# return parent

深度优先搜索

深度优先搜索和广度有限搜索一样,都是对图进行搜索的算法,目的都是从起点开始到达指定顶点(终点)。深度优先搜索会沿着一条路径不断往下搜索直到不能再继续为止,然后再折返,开始搜索下一条候补路径。

点击显/隐
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 深度优先搜索
def DFS(graph, s):
stack = []
stack.append(s)
seen = set()
seen.add(s)
while(len(stack) > 0):
vertex = stack.pop()
nodes = graph[vertex]
for w in nodes:
if w not in seen:
stack.append(w)
seen.add(w)
print(vertex)

最短路径

下面我们看一些求最短路径的算法,贝尔曼-福特算法,Dijkstra算法,还有A-star算法。求下面的图 $A$ 到其它节点的最短路径。

贝尔曼-福特算法(Bellman-ford)

贝尔曼-福特算法是求最短路的一种算法,该算法是以松弛操作为基础,集估计的最短路径值逐渐被更加精确的值代替,直至得到最优解。该算法的缺点是时间复杂度较高 $O(|V||E|)$,其中 $|V|$ 代表节点数量,$|E|$代表边的数量。

点击显/隐
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import math
def getEdges(G):
""" 输入图G,返回其边与端点的列表 """
v1 = [] # 出发点
v2 = [] # 对应的相邻到达点
w = [] # 顶点v1到顶点v2的边的权值
for i in G:
for j in G[i]:
if G[i][j] != 0:
w.append(G[i][j])
v1.append(i)
v2.append(j)
return v1,v2,w

class CycleError(Exception):
pass

def Bellman_Ford(G, s):
v1,v2,w = getEdges(G)

# 初始化源点与所有点之间的最短距离
distance = dict((k, math.inf) for k in G.keys())
distance[s] = 0
parent = {s: None}
# 核心算法
for k in range(len(G)-1): # 循环 n-1轮
check = 0 # 用于标记本轮松弛中distance是否发生更新
for i in range(len(w)): # 对每条边进行一次松弛操作
if distance[v1[i]] + w[i] < distance[v2[i]]:
distance[v2[i]] = distance[v1[i]] + w[i]
check = 1
parent[v2[i]] = v1[i]
if check == 0: break

# 检测负权回路
# 如果在 n-1 次松弛之后,最短路径依然发生变化,则该图必然存在负权回路
flag = 0
for i in range(len(w)): # 对每条边再尝试进行一次松弛操作
if distance[v1[i]] + w[i] < distance[v2[i]]:
flag = 1
break
if flag == 1:
# raise CycleError()
return False
return distance, parent

Dijkstra算法

Dijkstra算法可以看作是广度优先搜索在有权图上的推广,其是通过为每个顶点保留到目前为止的最短路径工作的。最初的Dijkstra算法没有通过小优先队列实现,时间复杂度为 $O(|V|^2)$(其中 $|V|为图的顶点个数$)。通过斐波那契堆实现的Dijkstra算法的时间复杂度为 $O(|E|+|V|log|V|)$ (其中 $|E|$ 为边数),对于不含负权的有向图,Dijkstra是已知的最快单源最短路径算法。

点击显/隐
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import heapq
import math
graph = {
"A": {"B": 5, "C": 1},
"B": {"A": 5, "C": 2, "D": 1},
"C": {"A": 1, "B": 2, "D": 4, "E": 8},
"D": {"B": 1, "C": 4, "E": 3, "F": 6},
"E": {"C": 8, "D": 3},
"F": {"D": 6}
}

def init_distance(graph, s):
distance = {s: 0}
for vertex in graph:
if vertex != s:
distance[vertex] = math.inf
return distance

def dijkstra(graph, s):
pqueue = []
heapq.heappush(pqueue, (0, s))
seen = set()
seen.add(s)
parent = {s: None}

distance = init_distance(graph, s)

while(len(pqueue) > 0):
pair = heapq.heappop(pqueue)
distance = pair[0]
vertex = pair[1]
seen.add(vertex)

nodes = graph[vertex].keys()
for w in nodes:
if w not in seen:
if distance + graph[vertex][w] < distance[w]:
heapq.heappush(pqueue, (distance + graph[vertex][w], w))
parent[w] = vertex
distance[w] = distance + graph[vertex][w]
return parent, distance

if __name__ == "__main__":
parent, distance= dijkstra(graph, "A")
print(parent, distance)

v = "B"
while v != None:
print(v)
v = parent[v]

虽然Doijkstra算法和Bellman ford算法一样可以求解有向图中的最短路径问题,但是当图中有负数权重时,Dijkstra算法无法得到正确的答案。

A-star 算法

A-star算法是有Dijkstra发展而来的算法。Dijkstra算法会从离起点近的顶点开始,按顺序求出起点到各个顶点的最短路径。也就是说一些离起点较远的顶点的最短路径也会被计算出来,但这部分是无用的。与之不同的是,A-star算法会先估计一个值,然后利用这个值省去无用的计算。
下面我们详细地介绍一下A-star算法在路径规划上面的应用,不同前面的简单路径图,我们这里使用了一个“硬核”的迷宫图,其中红色箭头表示起点位置,蓝色箭头表示终点位置,黑色的点代表障碍物,障碍物是不能直接穿过的。

点击显/隐
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import matplotlib.pyplot as plt
import math

show_animation = True


class Node:

def __init__(self, x, y, cost, pind):
self.x = x
self.y = y
self.cost = cost
self.pind = pind

def __str__(self):
return str(self.x) + "," + str(self.y) + "," + str(self.cost) + "," + str(self.pind)


def calc_final_path(ngoal, closedset, reso):
# generate final course
rx, ry = [ngoal.x * reso], [ngoal.y * reso]
pind = ngoal.pind
while pind != -1:
n = closedset[pind]
rx.append(n.x * reso)
ry.append(n.y * reso)
pind = n.pind

return rx, ry


def a_star_planning(sx, sy, gx, gy, ox, oy, reso, rr):
"""
gx: goal x position [m]
gx: goal x position [m]
ox: x position list of Obstacles [m]
oy: y position list of Obstacles [m]
reso: grid resolution [m]
rr: robot radius[m]
"""

nstart = Node(round(sx / reso), round(sy / reso), 0.0, -1)
ngoal = Node(round(gx / reso), round(gy / reso), 0.0, -1)
ox = [iox / reso for iox in ox]
oy = [ioy / reso for ioy in oy]

obmap, minx, miny, maxx, maxy, xw, yw = calc_obstacle_map(ox, oy, reso, rr)

motion = get_motion_model()

openset, closedset = dict(), dict()
openset[calc_index(nstart, xw, minx, miny)] = nstart

while 1:
c_id = min(
openset, key=lambda o: openset[o].cost + calc_heuristic(ngoal, openset[o]))
current = openset[c_id]

# show graph
if show_animation: # pragma: no cover
plt.plot(current.x * reso, current.y * reso, "xc")
if len(closedset.keys()) % 10 == 0:
plt.pause(0.001)

if current.x == ngoal.x and current.y == ngoal.y:
print("Find goal")
ngoal.pind = current.pind
ngoal.cost = current.cost
break

# Remove the item from the open set
del openset[c_id]
# Add it to the closed set
closedset[c_id] = current

# expand search grid based on motion model
for i, _ in enumerate(motion):
node = Node(current.x + motion[i][0],
current.y + motion[i][1],
current.cost + motion[i][2], c_id)
n_id = calc_index(node, xw, minx, miny)

if n_id in closedset:
continue

if not verify_node(node, obmap, minx, miny, maxx, maxy):
continue

if n_id not in openset:
openset[n_id] = node # Discover a new node
else:
if openset[n_id].cost >= node.cost:
# This path is the best until now. record it!
openset[n_id] = node

rx, ry = calc_final_path(ngoal, closedset, reso)

return rx, ry


def calc_heuristic(n1, n2):
w = 1.0 # weight of heuristic
d = w * math.sqrt((n1.x - n2.x)**2 + (n1.y - n2.y)**2)
return d


def verify_node(node, obmap, minx, miny, maxx, maxy):

if node.x < minx:
return False
elif node.y < miny:
return False
elif node.x >= maxx:
return False
elif node.y >= maxy:
return False

if obmap[node.x][node.y]:
return False

return True


def calc_obstacle_map(ox, oy, reso, vr):

minx = round(min(ox))
miny = round(min(oy))
maxx = round(max(ox))
maxy = round(max(oy))
# print("minx:", minx)
# print("miny:", miny)
# print("maxx:", maxx)
# print("maxy:", maxy)

xwidth = round(maxx - minx)
ywidth = round(maxy - miny)
# print("xwidth:", xwidth)
# print("ywidth:", ywidth)

# obstacle map generation
obmap = [[False for i in range(ywidth)] for i in range(xwidth)]
for ix in range(xwidth):
x = ix + minx
for iy in range(ywidth):
y = iy + miny
# print(x, y)
for iox, ioy in zip(ox, oy):
d = math.sqrt((iox - x)**2 + (ioy - y)**2)
if d <= vr / reso:
obmap[ix][iy] = True
break

return obmap, minx, miny, maxx, maxy, xwidth, ywidth


def calc_index(node, xwidth, xmin, ymin):
return (node.y - ymin) * xwidth + (node.x - xmin)


def get_motion_model():
# dx, dy, cost
motion = [[1, 0, 1],
[0, 1, 1],
[-1, 0, 1],
[0, -1, 1],
[-1, -1, math.sqrt(2)],
[-1, 1, math.sqrt(2)],
[1, -1, math.sqrt(2)],
[1, 1, math.sqrt(2)]]

return motion


def main():
print(__file__ + " start!!")

# start and goal position
sx = 10.0 # [m]
sy = 10.0 # [m]
gx = 50.0 # [m]
gy = 50.0 # [m]
grid_size = 1.0 # [m]
robot_size = 1.0 # [m]

ox, oy = [], []

for i in range(60):
ox.append(i)
oy.append(0.0)
for i in range(60):
ox.append(60.0)
oy.append(i)
for i in range(61):
ox.append(i)
oy.append(60.0)
for i in range(61):
ox.append(0.0)
oy.append(i)
for i in range(40):
ox.append(20.0)
oy.append(i)
for i in range(40):
ox.append(40.0)
oy.append(60.0 - i)

if show_animation: # pragma: no cover
plt.plot(ox, oy, ".k")
plt.plot(sx, sy, "xr")
plt.plot(gx, gy, "xb")
plt.grid(True)
plt.axis("equal")

rx, ry = a_star_planning(sx, sy, gx, gy, ox, oy, grid_size, robot_size)

if show_animation: # pragma: no cover
plt.plot(rx, ry, "-r")
plt.show()


if __name__ == '__main__':
main()
非常感谢各位老板投喂!