-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathai.py
610 lines (498 loc) · 20.1 KB
/
ai.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
"""
五子棋AI实现 - 使用FiberTree数据库进行决策路径优化
"""
import numpy as np
from fbtree import FiberTree, Move
import random
from typing import List, Tuple, Optional
import time
class GomokuState:
"""五子棋状态表示"""
def __init__(self, size=15):
self.size = size
# 0表示空, 1表示黑棋, 2表示白棋
self.board = np.zeros((size, size), dtype=np.int8)
self.current_player = 1 # 黑棋先行
self.last_move = None
self.moves_count = 0
def copy(self):
"""创建状态的深拷贝"""
new_state = GomokuState(self.size)
new_state.board = self.board.copy()
new_state.current_player = self.current_player
new_state.last_move = self.last_move
new_state.moves_count = self.moves_count
return new_state
def make_move(self, x: int, y: int) -> bool:
"""
尝试在(x,y)位置落子
Returns:
bool: 移动是否有效
"""
if not (0 <= x < self.size and 0 <= y < self.size):
return False
if self.board[x, y] != 0:
return False
self.board[x, y] = self.current_player
self.last_move = (x, y)
self.moves_count += 1
self.current_player = 3 - self.current_player # 切换玩家 (1->2, 2->1)
return True
def get_valid_moves(self) -> List[Tuple[int, int]]:
"""获取所有有效移动"""
# 只考虑已有棋子周围的空位
if self.moves_count == 0:
# 第一步: 返回棋盘中心及周围点
center = self.size // 2
return [(center, center)]
moves = set()
for x in range(self.size):
for y in range(self.size):
if self.board[x, y] != 0: # 有棋子的位置
# 检查周围3x3范围内的空位
for dx in range(-2, 3):
for dy in range(-2, 3):
nx, ny = x + dx, y + dy
if (0 <= nx < self.size and 0 <= ny < self.size and
self.board[nx, ny] == 0):
moves.add((nx, ny))
return list(moves)
def is_terminal(self) -> Tuple[bool, Optional[int]]:
"""
检查游戏是否结束
Returns:
Tuple[bool, Optional[int]]: (是否结束, 赢家(1或2)或None表示平局)
"""
if self.last_move is None:
return False, None
x, y = self.last_move
player = self.board[x, y]
# 检查四个方向: 水平, 垂直, 两个对角线
directions = [(1, 0), (0, 1), (1, 1), (1, -1)]
for dx, dy in directions:
count = 1 # 当前方向连子数
# 正向检查
tx, ty = x + dx, y + dy
while 0 <= tx < self.size and 0 <= ty < self.size and self.board[tx, ty] == player:
count += 1
tx, ty = tx + dx, ty + dy
# 反向检查
tx, ty = x - dx, y - dy
while 0 <= tx < self.size and 0 <= ty < self.size and self.board[tx, ty] == player:
count += 1
tx, ty = tx - dx, ty - dy
# 判断是否连成5子或更多
if count >= 5:
return True, player
# 检查是否平局 (棋盘已满)
if np.all(self.board != 0):
return True, None
return False, None
def evaluate(self) -> float:
"""
评估当前状态对AI(玩家2)的有利程度
Returns:
float: 评分越高对AI越有利
"""
terminal, winner = self.is_terminal()
if terminal:
if winner == 2: # AI赢
return 1000
elif winner == 1: # 玩家赢
return -1000
else: # 平局
return 0
# 非终局状态: 分析连子情况
score = 0
player_scores = {1: 0, 2: 0} # 玩家1和玩家2的评分
# 检查四个方向上的连子情况
directions = [(1, 0), (0, 1), (1, 1), (1, -1)]
for player in [1, 2]:
for x in range(self.size):
for y in range(self.size):
if self.board[x, y] != player:
continue
for dx, dy in directions:
# 计算连子
line_score = self._evaluate_line(x, y, dx, dy, player)
player_scores[player] += line_score
# AI评分减去玩家评分
score = player_scores[2] - player_scores[1]
return score
def _evaluate_line(self, x: int, y: int, dx: int, dy: int, player: int) -> float:
"""评估一条线上的连子情况"""
consecutive = 1 # 连续棋子数
blocked_ends = 0 # 被阻挡的端点数
# 正向检查
tx, ty = x + dx, y + dy
while 0 <= tx < self.size and 0 <= ty < self.size:
if self.board[tx, ty] == player:
consecutive += 1
elif self.board[tx, ty] == 0:
break
else:
blocked_ends += 1
break
tx, ty = tx + dx, ty + dy
# 负向检查
tx, ty = x - dx, y - dy
while 0 <= tx < self.size and 0 <= ty < self.size:
if self.board[tx, ty] == player:
consecutive += 1
elif self.board[tx, ty] == 0:
break
else:
blocked_ends += 1
break
tx, ty = tx - dx, ty - dy
# 根据连子数和阻挡情况评分
if consecutive >= 5:
return 100 # 五连珠
elif consecutive == 4:
if blocked_ends == 0:
return 90 # 活四
else:
return 50 # 冲四
elif consecutive == 3:
if blocked_ends == 0:
return 40 # 活三
else:
return 10 # 眠三
elif consecutive == 2:
if blocked_ends == 0:
return 5 # 活二
else:
return 2 # 眠二
return 0
class GomokuAI:
"""五子棋AI,使用FiberTree存储和优化决策路径"""
def __init__(self, tree_path=None, storage_type='memory'):
"""
初始化五子棋AI
Args:
tree_path: FiberTree存储路径
storage_type: 'memory'或'sqlite'
"""
if tree_path:
self.tree = FiberTree(storage_type=storage_type, db_path=tree_path)
else:
self.tree = FiberTree(storage_type=storage_type)
self.max_depth = 3 # MCTS搜索深度
self.simulation_count = 100 # 每次决策的模拟次数
self.exploration_weight = 1.0 # UCB探索权重
self.time_limit = 5 # 每步思考时间限制(秒)
def serialize_move(self, x: int, y: int) -> int:
"""将(x,y)坐标序列化为一个整数值"""
# 例如在15*15棋盘上,将(x,y)映射为x*15+y
return x * 15 + y
def deserialize_move(self, value: int) -> Tuple[int, int]:
"""将整数值反序列化为(x,y)坐标"""
x = value // 15
y = value % 15
return x, y
def get_move(self, state: GomokuState) -> Tuple[int, int]:
"""
基于当前状态选择最佳移动
Args:
state: 当前游戏状态
Returns:
Tuple[int, int]: 最佳移动的(x,y)坐标
"""
# 如果是第一步,直接下中心点
if state.moves_count == 0:
return state.size // 2, state.size // 2
# 获取有效移动
valid_moves = state.get_valid_moves()
if not valid_moves:
return -1, -1 # 没有合法移动
# 快速评估: 检查是否有能立即获胜的移动
for x, y in valid_moves:
next_state = state.copy()
next_state.make_move(x, y)
terminal, winner = next_state.is_terminal()
if terminal and winner == 2: # AI获胜
return x, y
# 时间限制内执行尽可能多的模拟
start_time = time.time()
simulation_count = 0
# 准备当前路径
current_path = []
for x in range(state.size):
for y in range(state.size):
if state.board[x, y] != 0:
move_value = self.serialize_move(x, y)
current_path.append(Move(move_value))
# 开始记录新路径
self.tree.start_path()
for move in current_path:
self.tree.add_move(move)
# 尝试从树中获取最佳后续移动
best_continuations = self.tree.get_best_continuation(current_path, top_n=3, min_visits=10)
# 如果有足够的数据,根据树的统计选择最佳移动
if best_continuations:
# 按胜率和访问次数选择最佳移动
best_continuation = max(best_continuations,
key=lambda x: (x['win_rate'], x['visits']))
best_move = best_continuation['move'].value
self.tree.end_path()
return self.deserialize_move(best_move)
# 如果没有足够的历史数据,使用MCTS搜索
while time.time() - start_time < self.time_limit:
# 评估每个移动
move_scores = []
for x, y in valid_moves:
next_state = state.copy()
next_state.make_move(x, y)
# 运行一次迷你Max搜索
score = self._minimax(next_state, depth=self.max_depth, alpha=float('-inf'),
beta=float('inf'), maximizing=False)
move_scores.append((x, y, score))
simulation_count += 1
# 选择得分最高的前N个移动
top_moves = sorted(move_scores, key=lambda x: x[2], reverse=True)[:3]
# 为每个选中的移动进行模拟并记录到树中
for x, y, score in top_moves:
move_value = self.serialize_move(x, y)
# 模拟这个移动
sim_state = state.copy()
sim_state.make_move(x, y)
terminal, winner = sim_state.is_terminal()
# 如果是终局状态
if terminal:
outcome = 'win' if winner == 2 else ('draw' if winner is None else 'loss')
# 将这个移动添加到路径并记录结果
self.tree.add_move(Move(move_value))
self.tree.record_outcome(outcome)
self.tree.start_path() # 重置到当前路径
for move in current_path:
self.tree.add_move(move)
else:
# 否则运行一个快速的随机模拟
outcome = self._random_playout(sim_state)
# 记录结果
self.tree.add_move(Move(move_value))
self.tree.record_outcome(outcome)
self.tree.start_path() # 重置到当前路径
for move in current_path:
self.tree.add_move(move)
# 根据树的统计选择最佳移动
best_continuations = self.tree.get_best_continuation(current_path, top_n=5)
if best_continuations:
# 按胜率和访问次数选择最佳移动
best_continuation = max(best_continuations,
key=lambda x: (x['win_rate'], x['visits']))
best_move = best_continuation['move'].value
self.tree.end_path()
return self.deserialize_move(best_move)
# 如果树中没有足够数据,返回评分最高的移动
sorted_moves = sorted(move_scores, key=lambda x: x[2], reverse=True)
self.tree.end_path()
return sorted_moves[0][0], sorted_moves[0][1]
def _minimax(self, state: GomokuState, depth: int, alpha: float, beta: float,
maximizing: bool) -> float:
"""
使用Alpha-Beta剪枝的Minimax搜索
Args:
state: 当前游戏状态
depth: 剩余搜索深度
alpha, beta: Alpha-Beta剪枝参数
maximizing: 是否为最大化玩家
Returns:
float: 状态的评分
"""
# 检查终局
terminal, winner = state.is_terminal()
if terminal:
if winner == 2: # AI赢
return 1000
elif winner == 1: # 玩家赢
return -1000
else: # 平局
return 0
# 达到深度限制
if depth == 0:
return state.evaluate()
valid_moves = state.get_valid_moves()
# 限制宽度以提高效率
if len(valid_moves) > 8:
# 对每个移动做简单评估
move_values = []
for x, y in valid_moves:
next_state = state.copy()
next_state.make_move(x, y)
score = next_state.evaluate()
move_values.append((x, y, score))
# 根据评分选择前8个移动
if maximizing:
valid_moves = [m[:2] for m in sorted(move_values, key=lambda x: x[2], reverse=True)[:8]]
else:
valid_moves = [m[:2] for m in sorted(move_values, key=lambda x: x[2])[:8]]
if maximizing: # AI的回合
max_eval = float('-inf')
for x, y in valid_moves:
next_state = state.copy()
next_state.make_move(x, y)
eval = self._minimax(next_state, depth - 1, alpha, beta, False)
max_eval = max(max_eval, eval)
alpha = max(alpha, eval)
if beta <= alpha:
break
return max_eval
else: # 玩家的回合
min_eval = float('inf')
for x, y in valid_moves:
next_state = state.copy()
next_state.make_move(x, y)
eval = self._minimax(next_state, depth - 1, alpha, beta, True)
min_eval = min(min_eval, eval)
beta = min(beta, eval)
if beta <= alpha:
break
return min_eval
def _random_playout(self, state: GomokuState) -> str:
"""
从当前状态随机模拟到游戏结束
Returns:
str: 游戏结果 ('win', 'loss', 或 'draw')
"""
sim_state = state.copy()
depth = 0
max_depth = 30 # 防止无限循环
while depth < max_depth:
terminal, winner = sim_state.is_terminal()
if terminal:
if winner == 2: # AI赢
return 'win'
elif winner == 1: # 玩家赢
return 'loss'
else: # 平局
return 'draw'
# 随机选择一个有效移动
valid_moves = sim_state.get_valid_moves()
if not valid_moves:
return 'draw'
x, y = random.choice(valid_moves)
sim_state.make_move(x, y)
depth += 1
# 如果达到最大深度,根据当前状态评估结果
score = sim_state.evaluate()
if score > 20:
return 'win'
elif score < -20:
return 'loss'
else:
return 'draw'
def learn_from_game(self, moves: List[Tuple[int, int]], outcome: str):
"""
从完整游戏记录中学习
Args:
moves: 游戏中的移动列表
outcome: 游戏结果 ('win', 'loss', 或 'draw')
"""
# 将移动转换为Move对象
path = [Move(self.serialize_move(x, y)) for x, y in moves]
# 添加到树中
self.tree.simulate_path(path, outcome)
def save_knowledge(self, file_path: str):
"""保存AI的知识库"""
self.tree.export_to_json(file_path)
def load_knowledge(self, file_path: str):
"""加载AI的知识库"""
self.tree = FiberTree.import_from_json(file_path, self.tree.storage_type, self.tree.db_path)
class GomokuGame:
"""五子棋游戏管理器"""
def __init__(self, size=15):
self.state = GomokuState(size)
self.ai = GomokuAI()
self.game_over = False
self.winner = None
def player_move(self, x: int, y: int) -> bool:
"""玩家下棋"""
if self.game_over:
return False
if self.state.current_player != 1:
return False
if not self.state.make_move(x, y):
return False
# 检查游戏是否结束
self.game_over, self.winner = self.state.is_terminal()
return True
def ai_move(self) -> Optional[Tuple[int, int]]:
"""AI下棋"""
if self.game_over or self.state.current_player != 2:
return None
x, y = self.ai.get_move(self.state)
if not self.state.make_move(x, y):
return None
# 检查游戏是否结束
self.game_over, self.winner = self.state.is_terminal()
return x, y
def reset(self):
"""重置游戏"""
self.state = GomokuState(self.state.size)
self.game_over = False
self.winner = None
def get_board(self) -> np.ndarray:
"""获取当前棋盘状态"""
return self.state.board.copy()
# 简单的命令行界面示例
def print_board(board):
"""打印棋盘 - 使用更美观、对齐的格式"""
size = board.shape[0]
# 打印列标题
print(" ", end="")
for j in range(size):
print(f"{j:2d}", end=" ")
print()
# 打印上边框
print(" +" + "---" * size + "+")
# 打印棋盘内容
for i in range(size):
print(f"{i:2d} |", end=" ")
for j in range(size):
if board[i, j] == 0:
print("·", end=" ")
elif board[i, j] == 1:
print("X", end=" ")
else:
print("O", end=" ")
print("|")
# 打印下边框
print(" +" + "---" * size + "+")
def main():
"""主函数"""
game = GomokuGame(15)
print("五子棋游戏 - 玩家(X) vs AI(O)")
print("输入坐标 (x y) 下棋,例如 '7 7'")
print("输入 'q' 退出")
while not game.game_over:
print_board(game.get_board())
if game.state.current_player == 1:
# 玩家回合
try:
move = input("你的回合 (x y): ")
if move.lower() == 'q':
break
x, y = map(int, move.split())
if not game.player_move(x, y):
print("无效移动,请重试")
continue
except ValueError:
print("输入无效,请输入两个数字,例如 '7 7'")
continue
else:
# AI回合
print("AI思考中...")
ai_move = game.ai_move()
if ai_move:
x, y = ai_move
print(f"AI落子于: {x} {y}")
print_board(game.get_board())
if game.winner == 1:
print("恭喜,你赢了!")
elif game.winner == 2:
print("AI获胜!")
else:
print("平局!")
if __name__ == "__main__":
main()