maze_python/tanxin.py
2025-07-03 19:26:44 +08:00

735 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import math
from maze import *
import math
import copy
from collections import deque
class Greedy3x3ResourceCollector:
"""
基于3x3视野的贪心资源收集器
每次移动时选择3x3视野范围内最高价值的资源
只能进行上下左右移动
"""
def __init__(self, map_data, start=None, end=None):
"""
初始化3x3视野贪心资源收集器
Args:
map_data: 迷宫地图2D列表 (注意:这里是[y][x]格式)
start: 起始位置 (x, y)如果为None则自动寻找
end: 目标位置 (x, y)如果为None则自动寻找
"""
self.original_map = copy.deepcopy(map_data)
self.map_data = copy.deepcopy(map_data)
self.rows = len(map_data)
self.cols = len(map_data[0]) if self.rows > 0 else 0
# 寻找起始位置和目标位置
self.start = start or self._find_position('s')
self.end = end or self._find_position('e')
if not self.start:
raise ValueError("无法找到起始位置 's'")
if not self.end:
raise ValueError("无法找到目标位置 'e'")
self.current_pos = self.start
self.path = [self.start]
self.collected_resources = []
self.total_value = 0
self.visited_resources = set()
self.explored_positions = set([self.start])
# 增加历史移动记录和死胡同检测相关变量
self.position_visit_count = {self.start: 1} # 记录每个位置的访问次数
self.deadend_positions = set() # 记录已知的死胡同位置
self.backtrack_points = [] # 记录可能的回溯点
self.oscillation_detection = [] # 用于检测来回走动的历史
self.max_oscillation_length = 6 # 检测来回走动的最大长度
print(f"3x3视野贪心算法初始化")
print(f"起始位置: {self.start}")
print(f"目标位置: {self.end}")
def _find_position(self, target):
"""寻找地图中指定字符的位置,返回(x, y)格式"""
for y in range(self.rows):
for x in range(self.cols):
if self.map_data[y][x].lower() == target.lower():
return (x, y)
return None
def get_3x3_vision(self, pos):
"""
获取以pos为中心的3x3视野范围内的所有单元格
Args:
pos: 当前位置 (x, y)
Returns:
dict: {(x, y): cell_value} 形式的字典
"""
x, y = pos
vision = {}
# 遍历3x3范围
for dx in range(-1, 2):
for dy in range(-1, 2):
new_x, new_y = x + dx, y + dy
# 检查边界
if 0 <= new_x < self.cols and 0 <= new_y < self.rows:
vision[(new_x, new_y)] = self.map_data[new_y][new_x]
return vision
def get_adjacent_cells(self, pos):
"""
获取当前位置的上下左右四个相邻位置
Args:
pos: 当前位置 (x, y)
Returns:
list: 可移动的相邻位置列表
"""
x, y = pos
adjacent = []
# 上下左右四个方向
directions = [(0, -1), (0, 1), (-1, 0), (1, 0)] # 上、下、左、右
for dx, dy in directions:
new_x, new_y = x + dx, y + dy
# 检查边界和可移动性
if (0 <= new_x < self.cols and
0 <= new_y < self.rows and
self.can_move_to((new_x, new_y))):
adjacent.append((new_x, new_y))
return adjacent
def can_move_to(self, pos):
"""
检查是否可以移动到指定位置
Args:
pos: 目标位置 (x, y)
Returns:
bool: 是否可以移动
"""
x, y = pos
cell = self.map_data[y][x]
# 不能移动到墙壁
if cell == '1':
return False
return True
def evaluate_resource_value(self, cell):
"""
评估资源的价值
Args:
cell: 单元格内容
Returns:
int: 资源价值,正数表示收益,负数表示损失
"""
if cell.startswith('g'):
# 金币资源,提取数值
try:
return int(cell[1:])
except ValueError:
return 0
elif cell.startswith('t'):
# 陷阱资源,提取数值并取负
try:
return -int(cell[1:])
except ValueError:
return 0
else:
# 其他类型单元格没有资源价值
return 0
def find_best_resource_in_3x3_vision(self):
"""
在3x3视野内寻找最佳资源
优先级:金币 > 未走过 > 走过的路(优先很久之前走过的路) > 墙/陷阱
加入死胡同检测和回溯机制
"""
x, y = self.current_pos
best_pos = None
best_value = float('-inf')
best_visited_time = float('inf')
# 更新当前位置的访问次数
self.position_visit_count[self.current_pos] = self.position_visit_count.get(self.current_pos, 0) + 1
# 检查是否处于死胡同中
if self.is_deadend(self.current_pos):
self.deadend_positions.add(self.current_pos)
# 寻找回溯点
backtrack_point = self.find_backtrack_point()
if backtrack_point != self.current_pos:
# 将当前位置到回溯点的路径添加到路径计划中
self.backtrack_points.append(backtrack_point)
print(f"检测到死胡同,计划回溯到: {backtrack_point}")
# 如果回溯点是相邻的,直接返回
if abs(backtrack_point[0] - x) + abs(backtrack_point[1] - y) == 1:
return backtrack_point, 0 # 回溯点价值为0
# 如果有待回溯的点,优先选择那个方向
if self.backtrack_points:
target = self.backtrack_points[-1]
# 计算到回溯点的方向
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
nx, ny = x + dx, y + dy
if (nx, ny) == target:
return (nx, ny), 0 # 回溯点价值为0
# 如果相邻点在路径上且朝向回溯点方向,也可以选择
if (0 <= nx < self.cols and 0 <= ny < self.rows and
self.map_data[ny][nx] != '1'): # 使用'1'表示墙壁
if ((nx > x and target[0] > x) or
(nx < x and target[0] < x) or
(ny > y and target[1] > y) or
(ny < y and target[1] < y)):
return (nx, ny), 0 # 朝向回溯点的方向价值为0
# 如果已经到达回溯点或无法向回溯点移动,弹出这个回溯点
if self.current_pos == self.backtrack_points[-1]:
self.backtrack_points.pop()
# 检测是否陷入来回走动的循环
if len(self.path) >= 2:
self.oscillation_detection.append(self.current_pos)
if len(self.oscillation_detection) > self.max_oscillation_length:
self.oscillation_detection.pop(0)
if self.detect_oscillation():
print("检测到来回走动,尝试打破循环")
# 清空回溯点列表,寻找新的探索方向
self.backtrack_points = []
# 尝试找到访问次数最少的相邻位置
min_visits = float('inf')
least_visited = None
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
nx, ny = x + dx, y + dy
if (0 <= nx < self.cols and 0 <= ny < self.rows and
self.map_data[ny][nx] != '1'): # 使用'1'表示墙壁
visits = self.position_visit_count.get((nx, ny), 0)
if visits < min_visits:
min_visits = visits
least_visited = (nx, ny)
if least_visited:
return least_visited, 0 # 访问次数最少的位置价值为0
# 在3x3视野内寻找最佳位置
for i in range(-1, 2):
for j in range(-1, 2):
# 跳过自身和对角线位置
if (i == 0 and j == 0) or (i != 0 and j != 0):
continue
nx, ny = x + i, y + j
# 检查位置是否在地图范围内
if 0 <= nx < self.cols and 0 <= ny < self.rows:
cell = self.map_data[ny][nx]
pos = (nx, ny)
# 检查是否是墙,不能走
if cell == '1':
continue
# 计算资源价值
value = self.evaluate_resource_value(cell)
# 检查是否已经走过这个位置
is_visited = pos in self.explored_positions
visited_time = self.position_visit_count.get(pos, 0)
# 计算探索潜力
exploration_potential = self.calculate_exploration_potential(pos)
# 优先级计算逻辑
# 1. 金币优先
if value > 0:
if (value > best_value or
(value == best_value and
((not is_visited and best_visited_time > 0) or
(is_visited and visited_time < best_visited_time)))):
best_value = value
best_pos = pos
best_visited_time = visited_time if is_visited else 0
# 2. 没有金币,选择未走过的路
elif not is_visited:
if best_value <= 0 and (best_visited_time > 0 or exploration_potential > best_value):
best_value = exploration_potential
best_pos = pos
best_visited_time = 0
# 3. 如果都走过了,选择走过次数最少的路
elif is_visited and visited_time < best_visited_time:
if best_value <= 0:
best_value = -visited_time # 负值,访问次数越少越好
best_pos = pos
best_visited_time = visited_time
# 如果找不到合适的位置,就选择任意一个可行的相邻位置
if best_pos is None:
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
nx, ny = x + dx, y + dy
if (0 <= nx < self.cols and 0 <= ny < self.rows and
self.map_data[ny][nx] != '1'): # 使用'1'表示墙壁
best_pos = (nx, ny)
break
return best_pos, best_value if best_value > float('-inf') else 0
def find_exploration_target(self):
"""
当视野内没有资源时,寻找探索目标
严格按照优先级:未走过的路 > 走过的路(很久之前走过的优先)
"""
adjacent = self.get_adjacent_cells(self.current_pos)
# 1. 优先级1未走过的路
unexplored = [pos for pos in adjacent if pos not in self.explored_positions]
if unexplored:
return unexplored[0] # 选择第一个未探索的位置
# 2. 优先级2走过的路按时间排序很久之前走过的优先
explored = []
for pos in adjacent:
if pos in self.explored_positions:
# 找出这个位置在路径中最早出现的索引
if pos in self.path:
earliest_index = self.path.index(pos)
explored.append((pos, earliest_index))
else:
# 如果在explored_positions但不在path中可能是通过其他方式标记的
# 给它一个很大的索引,表示是最近才探索的
explored.append((pos, float('inf')))
if explored:
# 按照索引排序,索引越小表示越早走过
explored.sort(key=lambda x: x[1])
return explored[0][0]
return None
def collect_resource(self, pos):
"""
收集指定位置的资源
Args:
pos: 资源位置 (x, y)
"""
x, y = pos
cell = self.map_data[y][x]
value = self.evaluate_resource_value(cell)
if value != 0:
self.collected_resources.append({
'position': pos,
'type': cell,
'value': value
})
self.total_value += value
self.visited_resources.add(pos)
print(f"收集资源: 位置{pos}, 类型{cell}, 价值{value}, 总价值{self.total_value}")
def run_3x3_greedy_collection(self, max_moves=1000):
"""
运行3x3视野贪心资源收集算法
严格按照优先级:金币 > 未走过的路 > 走过的路 > 墙/陷阱
对于走过的路,优先走很久之前走过的路
Args:
max_moves: 最大移动步数,防止无限循环
Returns:
dict: 包含路径、收集的资源等信息
"""
print("\\n开始3x3视野贪心资源收集...")
moves = 0
stuck_count = 0 # 连续无法找到资源的次数
max_stuck = 20 # 最大连续无资源次数
while moves < max_moves and stuck_count < max_stuck:
moves += 1
# 在3x3视野内寻找最佳位置按照严格优先级
best_pos, best_value = self.find_best_resource_in_3x3_vision()
if best_pos is not None:
# 移动到选定位置
self.current_pos = best_pos
self.path.append(best_pos)
self.explored_positions.add(best_pos)
# 如果是资源位置,进行收集
if best_value != 0:
print(f"{moves}步: 发现视野内金币 位置{best_pos}, 价值{best_value}")
self.collect_resource(best_pos)
stuck_count = 0 # 收集到资源后重置无资源计数
else:
# 是普通路径
if best_pos not in self.explored_positions:
print(f"{moves}步: 移动到未走过的路 位置{best_pos}")
else:
print(f"{moves}步: 移动到走过的路 位置{best_pos}")
stuck_count += 1
else:
# 没有可移动位置,结束收集
print(f"{moves}步: 无法进行任何移动,结束收集")
break
# 检查是否达到终点
if self.current_pos == self.end:
print(f"{moves}步: 到达终点!")
break
if moves >= max_moves:
print(f"达到最大移动步数 {max_moves},结束收集")
elif stuck_count >= max_stuck:
print(f"连续 {max_stuck} 步未找到资源,结束收集")
print("3x3视野资源收集完成")
print(f"总步数: {len(self.path)-1}, 收集资源数: {len(self.collected_resources)}, 资源总价值: {self.total_value}")
return self.get_collection_result()
def get_collection_result(self):
"""获取收集结果"""
return {
'path': self.path.copy(),
'collected_resources': self.collected_resources.copy(),
'total_value': self.total_value,
'total_moves': len(self.path) - 1,
'resources_count': len(self.collected_resources),
'start_pos': self.start,
'end_pos': self.end,
'final_pos': self.current_pos,
'explored_positions': len(self.explored_positions)
}
def reset(self):
"""重置收集器状态"""
self.map_data = copy.deepcopy(self.original_map)
self.current_pos = self.start
self.path = [self.start]
self.collected_resources = []
self.total_value = 0
self.visited_resources = set()
self.explored_positions = set([self.start])
self.position_visit_count = {self.start: 1}
self.deadend_positions = set()
self.backtrack_points = []
self.oscillation_detection = []
def get_path(self):
"""
获取完整的资源收集路径
返回:路径列表,格式为 [(x1, y1), (x2, y2), ...]
"""
# 先重置状态
self.reset()
max_steps = self.rows * self.cols * 3 # 设置最大步数限制,避免无限循环
steps = 0
reached_goal = False
while steps < max_steps and not reached_goal:
_, _, reached_goal = self.next_step()
steps += 1
# 如果路径长度已经很长但还没到达目标,可能是在循环
if steps > self.rows * self.cols * 2:
print(f"警告:路径过长 ({steps} 步),可能存在循环。提前结束。")
break
if reached_goal:
print(f"找到路径!总步数: {steps}, 总收集价值: {self.total_value}")
else:
print(f"未能找到到达目标的路径,已走 {steps} 步,总收集价值: {self.total_value}")
print(f"发现的死胡同数量: {len(self.deadend_positions)}")
return self.path
def next_step(self):
"""
执行下一步移动
返回:(新位置, 收集的资源价值, 是否到达目标)
"""
if self.current_pos == self.end:
return self.current_pos, 0, True
next_pos, value = self.find_best_resource_in_3x3_vision()
if next_pos is None:
# 如果找不到下一步,说明卡住了,可能是迷宫设计问题
print("找不到下一步移动,可能被卡住了")
return self.current_pos, 0, False
# 记录新位置和路径
self.current_pos = next_pos
self.path.append(next_pos)
self.explored_positions.add(next_pos)
# 更新位置访问计数
self.position_visit_count[next_pos] = self.position_visit_count.get(next_pos, 0) + 1
# 如果当前位置是回溯点且有多个回溯点,移除当前回溯点
if self.backtrack_points and next_pos == self.backtrack_points[-1]:
self.backtrack_points.pop()
# 收集资源
x, y = next_pos
cell = self.map_data[y][x]
value = self.evaluate_resource_value(cell)
if value > 0 and next_pos not in self.visited_resources:
self.collected_resources.append((next_pos, value))
self.visited_resources.add(next_pos)
self.total_value += value
# 标记资源已被收集,避免重复计算
if cell.startswith('g') or cell.startswith('c'):
try:
self.map_data[y][x] = 'v' # 将收集过的资源标记为已访问
except:
pass
# 检查是否到达目标
reached_goal = (next_pos == self.end)
# 调试信息
if len(self.path) % 10 == 0:
print(f"当前路径长度: {len(self.path)}, 总收集价值: {self.total_value}")
print(f"已发现的死胡同数量: {len(self.deadend_positions)}")
return next_pos, value, reached_goal
def is_deadend(self, pos):
"""
判断当前位置是否是死胡同
死胡同的定义:除了来路外,周围全是墙/陷阱/已走过的路
"""
x, y = pos
valid_directions = 0
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
nx, ny = x + dx, y + dy
if (0 <= nx < self.cols and 0 <= ny < self.rows and
self.map_data[ny][nx] != '1' and # 使用'1'表示墙壁
(nx, ny) not in self.explored_positions):
valid_directions += 1
# 如果没有未探索的方向,则是死胡同
return valid_directions == 0
def find_backtrack_point(self):
"""
寻找回溯点,即从路径中找到最近的有未探索方向的点
"""
# 从最近访问到最早访问的路径点遍历
for pos in reversed(self.path):
x, y = pos
# 检查这个点的四个方向是否有未探索的路
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
nx, ny = x + dx, y + dy
if (0 <= nx < self.cols and 0 <= ny < self.rows and
self.map_data[ny][nx] != '1' and # 使用'1'表示墙壁
(nx, ny) not in self.explored_positions):
return pos
# 如果找不到回溯点,则返回起始点
return self.start
def detect_oscillation(self):
"""
检测路径中是否有来回走动的情况
"""
if len(self.oscillation_detection) < self.max_oscillation_length:
return False
# 检查最近的移动是否形成循环
recent_moves = self.oscillation_detection[-self.max_oscillation_length:]
# 打印调试信息
print(f"检查振荡: {recent_moves[-6:]}")
# 检查是否有重复位置模式 (例如A-B-A-B或A-B-C-A-B-C)
for pattern_length in range(2, self.max_oscillation_length // 2 + 1):
if recent_moves[-pattern_length:] == recent_moves[-2*pattern_length:-pattern_length]:
print(f"检测到振荡!模式长度: {pattern_length}")
return True
# 更简单的检测:检查是否在有限步数内多次访问同一位置
position_counts = {}
for pos in recent_moves:
if pos in position_counts:
position_counts[pos] += 1
if position_counts[pos] >= 3: # 在短时间内访问同一位置3次以上
print(f"检测到位置 {pos} 被频繁访问 {position_counts[pos]}")
return True
else:
position_counts[pos] = 1
return False
def calculate_exploration_potential(self, pos):
"""
计算位置的探索潜力值
潜力值基于:
1. 周围未探索的方向数
2. 到达过这个位置的次数(次数越多潜力越低)
3. 是否含有资源
"""
x, y = pos
potential = 0
# 检查周围四个方向是否有未探索的路
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
nx, ny = x + dx, y + dy
if (0 <= nx < self.cols and 0 <= ny < self.rows):
# 未探索的路增加潜力
if (nx, ny) not in self.explored_positions and self.map_data[ny][nx] != '1':
potential += 10
# 有资源的路增加更多潜力
cell = self.map_data[ny][nx]
if cell.startswith('g'):
try:
value = int(cell[1:])
potential += value * 2
except ValueError:
potential += 5 # 如果无法解析值则默认增加5点潜力
# 访问次数越多,潜力越低
visit_penalty = self.position_visit_count.get(pos, 0) * 5
potential = max(0, potential - visit_penalty)
return potential
# 使用示例
def main():
obj = MazeGenerator(20,'demo.csv',name="龙脊峡谷迷宫")
obj.generate(
seed=123,
boss_count=2,
traps_range=(5, 10),
mechanisms_range=(3, 7),
skill_traps=8
)
obj.export_to_csv()
map_data = obj.read_csv()
see = MazeGenerator(1,filename ='demo.csv')
see.read_from_csv()
print("=== 原始迷宫 ===")
see.print_maze()
print("\\n" + "="*60)
print("使用传统贪心算法:")
print("="*60)
player = GreedyPlayer(map_data)
player.find_path()
see.maze = player.marked_map
see.print_maze()
print(f"传统贪心算法总收益: {player.get_total_reward()}")
print("\\n" + "="*60)
print("使用3x3视野贪心算法:")
print("="*60)
# 使用新的3x3视野算法
greedy_3x3 = Greedy3x3ResourceCollector(map_data)
result = greedy_3x3.run_3x3_greedy_collection()
# 显示结果
see.maze = greedy_3x3.add_path_to_map()
see.print_maze()
print(f"\\n3x3视野算法结果:")
print(f" 总移动步数: {result['total_moves']}")
print(f" 收集资源数量: {result['resources_count']}")
print(f" 资源总价值: {result['total_value']}")
print(f" 探索位置数: {result['explored_positions']}")
print("\\n收集的资源详情:")
for i, resource in enumerate(result['collected_resources'], 1):
print(f" {i}. 位置{resource['position']}: {resource['type']} (价值: {resource['value']})")
def demo_3x3_greedy():
"""演示3x3视野贪心算法"""
# 创建一个示例迷宫
demo_maze = [
['s', '0', 'g5', '1', 't3'],
['0', '1', '0', '0', 'g2'],
['g3', '0', '1', 't2', '0'],
['0', 't1', '0', '0', 'g4'],
['1', '0', 'g1', '0', 'e']
]
print("=== 3x3视野贪心算法演示 ===")
print("迷宫说明:")
print(" s: 起点, e: 终点")
print(" g数字: 金币资源 (正收益)")
print(" t数字: 陷阱资源 (负收益)")
print(" 0: 可通行路径, 1: 墙壁")
print("\\n原始迷宫:")
for row in demo_maze:
print(' '.join(f"{cell:>2}" for cell in row))
# 使用3x3视野贪心算法
collector = Greedy3x3ResourceCollector(demo_maze)
result = collector.run_3x3_greedy_collection()
# 显示标记后的迷宫
marked_maze = collector.add_path_to_map()
print("\\n标记路径后的迷宫:")
print("S: 起点, E: 终点, *: 已收集资源, .: 路径")
for row in marked_maze:
print(' '.join(f"{cell:>2}" for cell in row))
print(f"\\n算法结果:")
print(f" 总移动步数: {result['total_moves']}")
print(f" 收集资源数量: {result['resources_count']}")
print(f" 资源总价值: {result['total_value']}")
return collector, result
if __name__ == "__main__":
# 运行演示
print("选择运行模式:")
print("1. 完整迷宫生成和算法比较")
print("2. 简单3x3视野算法演示")
choice = input("请输入选择 (1 或 2默认为 2): ").strip()
if choice == "1":
main()
else:
demo_3x3_greedy()