1030 lines
38 KiB
Python
1030 lines
38 KiB
Python
import math
|
||
from maze import *
|
||
import math
|
||
import copy
|
||
from collections import deque
|
||
|
||
|
||
|
||
class Greedy3x3ResourceCollector:
|
||
"""
|
||
基于3x3视野的贪心资源收集器
|
||
每次移动时选择3x3视野范围内最高价值的资源
|
||
只能进行上下左右移动
|
||
"""
|
||
|
||
def __init__(self, map_data, start=None, end=None):
|
||
"""
|
||
初始化3x3视野贪心资源收集器
|
||
|
||
Args:
|
||
map_data: 迷宫地图,2D列表 (注意:这里是[y][x]格式)
|
||
start: 起始位置 (x, y),如果为None则自动寻找
|
||
end: 目标位置 (x, y),如果为None则自动寻找
|
||
"""
|
||
self.original_map = copy.deepcopy(map_data)
|
||
self.map_data = copy.deepcopy(map_data)
|
||
self.rows = len(map_data)
|
||
self.cols = len(map_data[0]) if self.rows > 0 else 0
|
||
|
||
# 寻找起始位置和目标位置
|
||
self.start = start or self._find_position('s')
|
||
self.end = end or self._find_position('e')
|
||
|
||
if not self.start:
|
||
raise ValueError("无法找到起始位置 's'")
|
||
if not self.end:
|
||
raise ValueError("无法找到目标位置 'e'")
|
||
|
||
self.current_pos = self.start
|
||
self.path = [self.start]
|
||
self.collected_resources = []
|
||
self.total_value = 0
|
||
self.visited_resources = set()
|
||
self.explored_positions = set([self.start])
|
||
|
||
# 增加历史移动记录和死胡同检测相关变量
|
||
self.position_visit_count = {self.start: 1} # 记录每个位置的访问次数
|
||
self.deadend_positions = set() # 记录已知的死胡同位置
|
||
self.backtrack_points = [] # 记录可能的回溯点
|
||
self.oscillation_detection = [] # 用于检测来回走动的历史
|
||
self.max_oscillation_length = 6 # 检测来回走动的最大长度
|
||
|
||
# 预处理每个点到终点的距离
|
||
self.distance_to_end = self._calculate_distance_to_end()
|
||
|
||
print(f"3x3视野贪心算法初始化")
|
||
print(f"起始位置: {self.start}")
|
||
print(f"目标位置: {self.end}")
|
||
|
||
def _find_position(self, target):
|
||
"""寻找地图中指定字符的位置,返回(x, y)格式"""
|
||
for y in range(self.rows):
|
||
for x in range(self.cols):
|
||
cell_value = str(self.map_data[y][x]).lower()
|
||
if cell_value.startswith(target.lower()):
|
||
return (x, y)
|
||
return None
|
||
|
||
def get_3x3_vision(self, pos):
|
||
"""
|
||
获取以pos为中心的3x3视野范围内的所有单元格
|
||
|
||
Args:
|
||
pos: 当前位置 (x, y)
|
||
|
||
Returns:
|
||
dict: {(x, y): cell_value} 形式的字典
|
||
"""
|
||
x, y = pos
|
||
vision = {}
|
||
|
||
# 遍历3x3范围
|
||
for dx in range(-1, 2):
|
||
for dy in range(-1, 2):
|
||
new_x, new_y = x + dx, y + dy
|
||
|
||
# 检查边界
|
||
if 0 <= new_x < self.cols and 0 <= new_y < self.rows:
|
||
vision[(new_x, new_y)] = self.map_data[new_y][new_x]
|
||
|
||
return vision
|
||
|
||
def get_adjacent_cells(self, pos):
|
||
"""
|
||
获取当前位置的上下左右四个相邻位置
|
||
|
||
Args:
|
||
pos: 当前位置 (x, y)
|
||
|
||
Returns:
|
||
list: 可移动的相邻位置列表
|
||
"""
|
||
x, y = pos
|
||
adjacent = []
|
||
|
||
# 上下左右四个方向
|
||
directions = [(0, -1), (0, 1), (-1, 0), (1, 0)] # 上、下、左、右
|
||
|
||
for dx, dy in directions:
|
||
new_x, new_y = x + dx, y + dy
|
||
|
||
# 检查边界和可移动性
|
||
if (0 <= new_x < self.cols and
|
||
0 <= new_y < self.rows and
|
||
self.can_move_to((new_x, new_y))):
|
||
adjacent.append((new_x, new_y))
|
||
|
||
return adjacent
|
||
|
||
def can_move_to(self, pos):
|
||
"""
|
||
检查是否可以移动到指定位置
|
||
|
||
Args:
|
||
pos: 目标位置 (x, y)
|
||
|
||
Returns:
|
||
bool: 是否可以移动
|
||
"""
|
||
x, y = pos
|
||
cell = self.map_data[y][x]
|
||
|
||
# 不能移动到墙壁
|
||
if cell == '1':
|
||
return False
|
||
|
||
return True
|
||
|
||
def evaluate_resource_value(self, cell):
|
||
"""
|
||
评估资源的价值
|
||
|
||
Args:
|
||
cell: 单元格内容
|
||
|
||
Returns:
|
||
int: 资源价值,正数表示收益,负数表示损失
|
||
"""
|
||
if cell.startswith('g'):
|
||
# 金币资源,提取数值
|
||
try:
|
||
return int(cell[1:])
|
||
except ValueError:
|
||
return 0
|
||
elif cell.startswith('t'):
|
||
# 陷阱资源,提取数值并取负
|
||
try:
|
||
return -int(cell[1:])
|
||
except ValueError:
|
||
return 0
|
||
else:
|
||
# 其他类型单元格没有资源价值
|
||
return 0
|
||
|
||
def find_best_resource_in_3x3_vision(self):
|
||
"""
|
||
在3x3视野内寻找最佳资源,新的优先级顺序:
|
||
1. 金币(优先直接相邻,也考虑对角线上可通过两步到达的)
|
||
2. 空地(优先离终点更近的)
|
||
3. boss或机关
|
||
4. 陷阱(尽可能减少损失)
|
||
5. 墙壁不考虑
|
||
|
||
加入死胡同检测和回溯机制
|
||
"""
|
||
x, y = self.current_pos
|
||
|
||
# 更新当前位置的访问次数
|
||
self.position_visit_count[self.current_pos] = self.position_visit_count.get(self.current_pos, 0) + 1
|
||
|
||
# 检查是否处于死胡同中
|
||
if self.is_deadend(self.current_pos):
|
||
self.deadend_positions.add(self.current_pos)
|
||
# 寻找回溯点
|
||
backtrack_point = self.find_backtrack_point()
|
||
if backtrack_point != self.current_pos:
|
||
# 将当前位置到回溯点的路径添加到路径计划中
|
||
self.backtrack_points.append(backtrack_point)
|
||
print(f"检测到死胡同,计划回溯到: {backtrack_point}")
|
||
# 如果回溯点是相邻的,直接返回
|
||
if abs(backtrack_point[0] - x) + abs(backtrack_point[1] - y) == 1:
|
||
return backtrack_point, 0 # 回溯点,价值为0
|
||
|
||
# 如果有待回溯的点,优先选择那个方向
|
||
if self.backtrack_points:
|
||
target = self.backtrack_points[-1]
|
||
# 计算到回溯点的方向
|
||
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
|
||
nx, ny = x + dx, y + dy
|
||
if (nx, ny) == target:
|
||
return (nx, ny), 0 # 回溯点,价值为0
|
||
|
||
# 如果相邻点在路径上且朝向回溯点方向,也可以选择
|
||
if (0 <= nx < self.cols and 0 <= ny < self.rows and
|
||
self.map_data[ny][nx] != '1'): # 使用'1'表示墙壁
|
||
if ((nx > x and target[0] > x) or
|
||
(nx < x and target[0] < x) or
|
||
(ny > y and target[1] > y) or
|
||
(ny < y and target[1] < y)):
|
||
return (nx, ny), 0 # 朝向回溯点的方向,价值为0
|
||
|
||
# 如果已经到达回溯点或无法向回溯点移动,弹出这个回溯点
|
||
if self.current_pos == self.backtrack_points[-1]:
|
||
self.backtrack_points.pop()
|
||
|
||
# 检测是否陷入来回走动的循环
|
||
if len(self.path) >= 2:
|
||
self.oscillation_detection.append(self.current_pos)
|
||
if len(self.oscillation_detection) > self.max_oscillation_length:
|
||
self.oscillation_detection.pop(0)
|
||
|
||
if self.detect_oscillation():
|
||
print("检测到来回走动,尝试打破循环")
|
||
# 清空回溯点列表,寻找新的探索方向
|
||
self.backtrack_points = []
|
||
# 尝试找到访问次数最少的相邻位置
|
||
min_visits = float('inf')
|
||
least_visited = None
|
||
|
||
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
|
||
nx, ny = x + dx, y + dy
|
||
if (0 <= nx < self.cols and 0 <= ny < self.rows and
|
||
self.map_data[ny][nx] != '1'): # 使用'1'表示墙壁
|
||
visits = self.position_visit_count.get((nx, ny), 0)
|
||
if visits < min_visits:
|
||
min_visits = visits
|
||
least_visited = (nx, ny)
|
||
|
||
if least_visited:
|
||
return least_visited, 0 # 访问次数最少的位置,价值为0
|
||
|
||
# 按照新的优先级在3x3视野内寻找最佳位置
|
||
# 1. 收集相邻和对角线上的所有单元格
|
||
adjacent_cells = [] # 相邻的单元格 [(pos, cell_type, value)]
|
||
diagonal_cells = [] # 对角线上的单元格 [(pos, cell_type, value, can_reach)]
|
||
|
||
for i in range(-1, 2):
|
||
for j in range(-1, 2):
|
||
nx, ny = x + i, y + j
|
||
|
||
# 检查位置是否在地图范围内
|
||
if 0 <= nx < self.cols and 0 <= ny < self.rows:
|
||
pos = (nx, ny)
|
||
cell = self.map_data[ny][nx]
|
||
|
||
# 检查是否是墙,不能走
|
||
if cell == '1':
|
||
continue
|
||
|
||
# 计算资源价值
|
||
value = self.evaluate_resource_value(cell)
|
||
cell_type = self._get_cell_type(cell)
|
||
|
||
# 相邻位置(上下左右)
|
||
if i == 0 or j == 0:
|
||
if i != 0 or j != 0: # 排除自身位置
|
||
adjacent_cells.append((pos, cell_type, value))
|
||
# 对角线位置
|
||
else:
|
||
# 判断是否可以两步到达
|
||
can_reach = self._can_reach_diagonal_coin(self.current_pos, pos)
|
||
diagonal_cells.append((pos, cell_type, value, can_reach))
|
||
|
||
# 2. 优先级1:查找相邻的金币
|
||
adjacent_coins = [(pos, value) for pos, cell_type, value in adjacent_cells
|
||
if cell_type == 'gold' and value > 0]
|
||
|
||
if adjacent_coins:
|
||
# 如果有多个金币,选择价值最高的
|
||
best_coin = max(adjacent_coins, key=lambda x: x[1])
|
||
return best_coin
|
||
|
||
# 3. 优先级2:查找可达的对角线上的金币
|
||
diagonal_coins = [(pos, value) for pos, cell_type, value, can_reach in diagonal_cells
|
||
if cell_type == 'gold' and value > 0 and can_reach]
|
||
|
||
if diagonal_coins:
|
||
# 如果有多个可达的对角线金币,选择价值最高的
|
||
best_diag_coin = max(diagonal_coins, key=lambda x: x[1])
|
||
# 计算到达这个对角线金币的两步路径中的第一步
|
||
dx, dy = best_diag_coin[0]
|
||
step_x = 1 if dx > x else -1
|
||
step_y = 1 if dy > y else -1
|
||
|
||
# 检查两种可能路径的第一步
|
||
path1_pos = (x + step_x, y)
|
||
path2_pos = (x, y + step_y)
|
||
|
||
# 选择离终点更近的路径
|
||
dist1 = self.distance_to_end.get(path1_pos, float('inf'))
|
||
dist2 = self.distance_to_end.get(path2_pos, float('inf'))
|
||
|
||
if dist1 <= dist2 and self.can_move_to(path1_pos):
|
||
return path1_pos, 0
|
||
elif self.can_move_to(path2_pos):
|
||
return path2_pos, 0
|
||
|
||
# 4. 优先级3:查找空地,优先选择离终点更近的
|
||
empty_spaces = [(pos, self.distance_to_end.get(pos, float('inf')))
|
||
for pos, cell_type, _ in adjacent_cells
|
||
if cell_type == 'empty']
|
||
|
||
if empty_spaces:
|
||
# 选择离终点最近的空地
|
||
best_empty = min(empty_spaces, key=lambda x: x[1])
|
||
return best_empty[0], 0
|
||
|
||
# 5. 优先级4:boss或机关
|
||
mechanism_positions = [(pos, value) for pos, cell_type, value in adjacent_cells
|
||
if cell_type in ['boss', 'mechanism']]
|
||
|
||
if mechanism_positions:
|
||
# 选择任意一个boss或机关位置
|
||
return mechanism_positions[0][0], mechanism_positions[0][1]
|
||
|
||
# 6. 优先级5:陷阱,尽可能减少损失
|
||
trap_positions = [(pos, value) for pos, cell_type, value in adjacent_cells
|
||
if cell_type == 'trap']
|
||
|
||
if trap_positions:
|
||
# 选择损失最小的陷阱(value是负数,所以用max)
|
||
best_trap = max(trap_positions, key=lambda x: x[1])
|
||
return best_trap
|
||
|
||
# 如果找不到合适的位置,就选择任意一个可行的相邻位置
|
||
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
|
||
nx, ny = x + dx, y + dy
|
||
if (0 <= nx < self.cols and 0 <= ny < self.rows and
|
||
self.map_data[ny][nx] != '1'): # 使用'1'表示墙壁
|
||
return (nx, ny), 0
|
||
|
||
# 如果实在没有可行位置,返回None
|
||
return None, 0
|
||
|
||
def _get_cell_type(self, cell):
|
||
"""
|
||
获取单元格类型
|
||
|
||
Args:
|
||
cell: 单元格内容
|
||
|
||
Returns:
|
||
str: 'gold', 'trap', 'boss', 'mechanism', 'empty', 'wall', 'start', 'end'
|
||
"""
|
||
if cell.startswith('g'):
|
||
return 'gold'
|
||
elif cell.startswith('t'):
|
||
return 'trap'
|
||
elif cell.startswith('b'):
|
||
return 'boss'
|
||
elif cell.startswith('l'):
|
||
return 'mechanism'
|
||
elif cell == 's':
|
||
return 'start'
|
||
elif cell == 'e':
|
||
return 'end'
|
||
elif cell == '1':
|
||
return 'wall'
|
||
else:
|
||
return 'empty' # 包括 '0' 和其他可通行的单元格
|
||
|
||
def find_exploration_target(self):
|
||
"""
|
||
当视野内没有资源时,寻找探索目标
|
||
严格按照优先级:未走过的路 > 走过的路(很久之前走过的优先)
|
||
"""
|
||
adjacent = self.get_adjacent_cells(self.current_pos)
|
||
|
||
# 1. 优先级1:未走过的路
|
||
unexplored = [pos for pos in adjacent if pos not in self.explored_positions]
|
||
if unexplored:
|
||
return unexplored[0] # 选择第一个未探索的位置
|
||
|
||
# 2. 优先级2:走过的路,按时间排序(很久之前走过的优先)
|
||
explored = []
|
||
for pos in adjacent:
|
||
if pos in self.explored_positions:
|
||
# 找出这个位置在路径中最早出现的索引
|
||
if pos in self.path:
|
||
earliest_index = self.path.index(pos)
|
||
explored.append((pos, earliest_index))
|
||
else:
|
||
# 如果在explored_positions但不在path中,可能是通过其他方式标记的
|
||
# 给它一个很大的索引,表示是最近才探索的
|
||
explored.append((pos, float('inf')))
|
||
|
||
if explored:
|
||
# 按照索引排序,索引越小表示越早走过
|
||
explored.sort(key=lambda x: x[1])
|
||
return explored[0][0]
|
||
|
||
return None
|
||
|
||
def collect_resource(self, pos):
|
||
"""
|
||
收集指定位置的资源
|
||
|
||
Args:
|
||
pos: 资源位置 (x, y)
|
||
"""
|
||
x, y = pos
|
||
cell = self.map_data[y][x]
|
||
value = self.evaluate_resource_value(cell)
|
||
|
||
if value != 0:
|
||
self.collected_resources.append({
|
||
'position': pos,
|
||
'type': cell,
|
||
'value': value
|
||
})
|
||
self.total_value += value
|
||
self.visited_resources.add(pos)
|
||
|
||
print(f"收集资源: 位置{pos}, 类型{cell}, 价值{value}, 总价值{self.total_value}")
|
||
|
||
def run_3x3_greedy_collection(self, max_moves=1000):
|
||
"""
|
||
运行3x3视野贪心资源收集算法
|
||
严格按照优先级:金币 > 未走过的路 > 走过的路 > 墙/陷阱
|
||
对于走过的路,优先走很久之前走过的路
|
||
|
||
Args:
|
||
max_moves: 最大移动步数,防止无限循环
|
||
|
||
Returns:
|
||
dict: 包含路径、收集的资源等信息
|
||
"""
|
||
print("\\n开始3x3视野贪心资源收集...")
|
||
|
||
moves = 0
|
||
stuck_count = 0 # 连续无法找到资源的次数
|
||
max_stuck = 20 # 最大连续无资源次数
|
||
|
||
while moves < max_moves and stuck_count < max_stuck:
|
||
moves += 1
|
||
|
||
# 在3x3视野内寻找最佳位置(按照严格优先级)
|
||
best_pos, best_value = self.find_best_resource_in_3x3_vision()
|
||
|
||
if best_pos is not None:
|
||
# 移动到选定位置
|
||
self.current_pos = best_pos
|
||
self.path.append(best_pos)
|
||
self.explored_positions.add(best_pos)
|
||
|
||
# 如果是资源位置,进行收集
|
||
if best_value != 0:
|
||
print(f"第{moves}步: 发现视野内金币 位置{best_pos}, 价值{best_value}")
|
||
self.collect_resource(best_pos)
|
||
stuck_count = 0 # 收集到资源后重置无资源计数
|
||
else:
|
||
# 是普通路径
|
||
if best_pos not in self.explored_positions:
|
||
print(f"第{moves}步: 移动到未走过的路 位置{best_pos}")
|
||
else:
|
||
print(f"第{moves}步: 移动到走过的路 位置{best_pos}")
|
||
stuck_count += 1
|
||
else:
|
||
# 没有可移动位置,结束收集
|
||
print(f"第{moves}步: 无法进行任何移动,结束收集")
|
||
break
|
||
|
||
# 检查是否达到终点
|
||
if self.current_pos == self.end:
|
||
print(f"第{moves}步: 到达终点!")
|
||
break
|
||
|
||
if moves >= max_moves:
|
||
print(f"达到最大移动步数 {max_moves},结束收集")
|
||
elif stuck_count >= max_stuck:
|
||
print(f"连续 {max_stuck} 步未找到资源,结束收集")
|
||
|
||
print("3x3视野资源收集完成!")
|
||
print(f"总步数: {len(self.path)-1}, 收集资源数: {len(self.collected_resources)}, 资源总价值: {self.total_value}")
|
||
return self.get_collection_result()
|
||
|
||
def get_collection_result(self):
|
||
"""获取收集结果"""
|
||
return {
|
||
'path': self.path.copy(),
|
||
'collected_resources': self.collected_resources.copy(),
|
||
'total_value': self.total_value,
|
||
'total_moves': len(self.path) - 1,
|
||
'resources_count': len(self.collected_resources),
|
||
'start_pos': self.start,
|
||
'end_pos': self.end,
|
||
'final_pos': self.current_pos,
|
||
'explored_positions': len(self.explored_positions)
|
||
}
|
||
|
||
def reset(self):
|
||
"""重置收集器状态"""
|
||
self.map_data = copy.deepcopy(self.original_map)
|
||
self.current_pos = self.start
|
||
self.path = [self.start]
|
||
self.collected_resources = []
|
||
self.total_value = 0
|
||
self.visited_resources = set()
|
||
self.explored_positions = set([self.start])
|
||
self.position_visit_count = {self.start: 1}
|
||
self.deadend_positions = set()
|
||
self.backtrack_points = []
|
||
self.oscillation_detection = []
|
||
|
||
def get_path(self):
|
||
"""
|
||
获取完整的资源收集路径
|
||
返回:路径列表,格式为 [(x1, y1), (x2, y2), ...]
|
||
"""
|
||
# 先重置状态
|
||
self.reset()
|
||
|
||
max_steps = self.rows * self.cols * 3 # 设置最大步数限制,避免无限循环
|
||
steps = 0
|
||
reached_goal = False
|
||
|
||
while steps < max_steps and not reached_goal:
|
||
_, _, reached_goal = self.next_step()
|
||
steps += 1
|
||
|
||
# 如果路径长度已经很长但还没到达目标,可能是在循环
|
||
if steps > self.rows * self.cols * 2:
|
||
print(f"警告:路径过长 ({steps} 步),可能存在循环。提前结束。")
|
||
break
|
||
|
||
if reached_goal:
|
||
print(f"找到路径!总步数: {steps}, 总收集价值: {self.total_value}")
|
||
else:
|
||
print(f"未能找到到达目标的路径,已走 {steps} 步,总收集价值: {self.total_value}")
|
||
|
||
print(f"发现的死胡同数量: {len(self.deadend_positions)}")
|
||
|
||
return self.path
|
||
|
||
def next_step(self):
|
||
"""
|
||
执行下一步移动
|
||
返回:(新位置, 收集的资源价值, 是否到达目标)
|
||
"""
|
||
if self.current_pos == self.end:
|
||
return self.current_pos, 0, True
|
||
|
||
next_pos, value = self.find_best_resource_in_3x3_vision()
|
||
if next_pos is None:
|
||
# 如果找不到下一步,说明卡住了,可能是迷宫设计问题
|
||
print("找不到下一步移动,可能被卡住了")
|
||
return self.current_pos, 0, False
|
||
|
||
# 记录新位置和路径
|
||
self.current_pos = next_pos
|
||
self.path.append(next_pos)
|
||
self.explored_positions.add(next_pos)
|
||
|
||
# 更新位置访问计数
|
||
self.position_visit_count[next_pos] = self.position_visit_count.get(next_pos, 0) + 1
|
||
|
||
# 如果当前位置是回溯点且有多个回溯点,移除当前回溯点
|
||
if self.backtrack_points and next_pos == self.backtrack_points[-1]:
|
||
self.backtrack_points.pop()
|
||
|
||
# 收集资源
|
||
x, y = next_pos
|
||
cell = self.map_data[y][x]
|
||
value = self.evaluate_resource_value(cell)
|
||
|
||
if value > 0 and next_pos not in self.visited_resources:
|
||
self.collected_resources.append((next_pos, value))
|
||
self.visited_resources.add(next_pos)
|
||
self.total_value += value
|
||
# 标记资源已被收集,避免重复计算
|
||
if cell.startswith('g') or cell.startswith('c'):
|
||
try:
|
||
self.map_data[y][x] = 'v' # 将收集过的资源标记为已访问
|
||
except:
|
||
pass
|
||
|
||
# 检查是否到达目标
|
||
reached_goal = (next_pos == self.end)
|
||
|
||
# 调试信息
|
||
if len(self.path) % 10 == 0:
|
||
print(f"当前路径长度: {len(self.path)}, 总收集价值: {self.total_value}")
|
||
print(f"已发现的死胡同数量: {len(self.deadend_positions)}")
|
||
|
||
return next_pos, value, reached_goal
|
||
|
||
def is_deadend(self, pos):
|
||
"""
|
||
判断当前位置是否是死胡同
|
||
死胡同的定义:除了来路外,周围全是墙/陷阱/已走过的路
|
||
"""
|
||
x, y = pos
|
||
valid_directions = 0
|
||
|
||
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
|
||
nx, ny = x + dx, y + dy
|
||
if (0 <= nx < self.cols and 0 <= ny < self.rows and
|
||
self.map_data[ny][nx] != '1' and # 使用'1'表示墙壁
|
||
(nx, ny) not in self.explored_positions):
|
||
valid_directions += 1
|
||
|
||
# 如果没有未探索的方向,则是死胡同
|
||
return valid_directions == 0
|
||
|
||
def find_backtrack_point(self):
|
||
"""
|
||
寻找回溯点,即从路径中找到最近的有未探索方向的点
|
||
"""
|
||
# 从最近访问到最早访问的路径点遍历
|
||
for pos in reversed(self.path):
|
||
x, y = pos
|
||
|
||
# 检查这个点的四个方向是否有未探索的路
|
||
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
|
||
nx, ny = x + dx, y + dy
|
||
if (0 <= nx < self.cols and 0 <= ny < self.rows and
|
||
self.map_data[ny][nx] != '1' and # 使用'1'表示墙壁
|
||
(nx, ny) not in self.explored_positions):
|
||
return pos
|
||
|
||
# 如果找不到回溯点,则返回起始点
|
||
return self.start
|
||
|
||
def detect_oscillation(self):
|
||
"""
|
||
检测路径中是否有来回走动的情况
|
||
"""
|
||
if len(self.oscillation_detection) < self.max_oscillation_length:
|
||
return False
|
||
|
||
# 检查最近的移动是否形成循环
|
||
recent_moves = self.oscillation_detection[-self.max_oscillation_length:]
|
||
|
||
# 打印调试信息
|
||
print(f"检查振荡: {recent_moves[-6:]}")
|
||
|
||
# 检查是否有重复位置模式 (例如A-B-A-B或A-B-C-A-B-C)
|
||
for pattern_length in range(2, self.max_oscillation_length // 2 + 1):
|
||
if recent_moves[-pattern_length:] == recent_moves[-2*pattern_length:-pattern_length]:
|
||
print(f"检测到振荡!模式长度: {pattern_length}")
|
||
return True
|
||
|
||
# 更简单的检测:检查是否在有限步数内多次访问同一位置
|
||
position_counts = {}
|
||
for pos in recent_moves:
|
||
if pos in position_counts:
|
||
position_counts[pos] += 1
|
||
if position_counts[pos] >= 3: # 在短时间内访问同一位置3次以上
|
||
print(f"检测到位置 {pos} 被频繁访问 {position_counts[pos]} 次")
|
||
return True
|
||
else:
|
||
position_counts[pos] = 1
|
||
|
||
return False
|
||
|
||
def calculate_exploration_potential(self, pos):
|
||
"""
|
||
计算位置的探索潜力值
|
||
潜力值基于:
|
||
1. 周围未探索的方向数
|
||
2. 到达过这个位置的次数(次数越多潜力越低)
|
||
3. 是否含有资源
|
||
"""
|
||
x, y = pos
|
||
potential = 0
|
||
|
||
# 检查周围四个方向是否有未探索的路
|
||
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
|
||
nx, ny = x + dx, y + dy
|
||
if (0 <= nx < self.cols and 0 <= ny < self.rows):
|
||
# 未探索的路增加潜力
|
||
if (nx, ny) not in self.explored_positions and self.map_data[ny][nx] != '1':
|
||
potential += 10
|
||
|
||
# 有资源的路增加更多潜力
|
||
cell = self.map_data[ny][nx]
|
||
if cell.startswith('g'):
|
||
try:
|
||
value = int(cell[1:])
|
||
potential += value * 2
|
||
except ValueError:
|
||
potential += 5 # 如果无法解析值,则默认增加5点潜力
|
||
|
||
# 访问次数越多,潜力越低
|
||
visit_penalty = self.position_visit_count.get(pos, 0) * 5
|
||
potential = max(0, potential - visit_penalty)
|
||
|
||
return potential
|
||
|
||
def _calculate_distance_to_end(self):
|
||
"""
|
||
使用BFS预处理计算每个点到终点的距离
|
||
返回字典 {(x, y): distance}
|
||
"""
|
||
distances = {}
|
||
queue = deque([(self.end, 0)]) # (位置, 距离)
|
||
visited = {self.end}
|
||
|
||
# 方向:上、下、左、右
|
||
directions = [(0, -1), (0, 1), (-1, 0), (1, 0)]
|
||
|
||
while queue:
|
||
(x, y), dist = queue.popleft()
|
||
distances[(x, y)] = dist
|
||
|
||
# 检查四个方向
|
||
for dx, dy in directions:
|
||
nx, ny = x + dx, y + dy
|
||
|
||
# 检查边界和可行性
|
||
if (0 <= nx < self.cols and
|
||
0 <= ny < self.rows and
|
||
self.map_data[ny][nx] != '1' and # 不是墙
|
||
(nx, ny) not in visited):
|
||
|
||
visited.add((nx, ny))
|
||
queue.append(((nx, ny), dist + 1))
|
||
|
||
print(f"已预处理完成从终点的距离计算,可达点数量: {len(distances)}")
|
||
return distances
|
||
|
||
def _can_reach_diagonal_coin(self, current_pos, diagonal_pos):
|
||
"""
|
||
检查对角线上的金币是否可通过两步到达
|
||
|
||
Args:
|
||
current_pos: 当前位置 (x, y)
|
||
diagonal_pos: 对角线上的金币位置 (x, y)
|
||
|
||
Returns:
|
||
bool: 是否可以通过两步到达
|
||
"""
|
||
cx, cy = current_pos
|
||
dx, dy = diagonal_pos
|
||
|
||
# 计算方向
|
||
step_x = 1 if dx > cx else -1
|
||
step_y = 1 if dy > cy else -1
|
||
|
||
# 检查两种可能的两步路径
|
||
path1 = [(cx + step_x, cy), diagonal_pos] # 先横后纵
|
||
path2 = [(cx, cy + step_y), diagonal_pos] # 先纵后横
|
||
|
||
# 检查路径1是否可行
|
||
path1_valid = True
|
||
for x, y in path1:
|
||
if not (0 <= x < self.cols and 0 <= y < self.rows and self.map_data[y][x] != '1'):
|
||
path1_valid = False
|
||
break
|
||
|
||
# 检查路径2是否可行
|
||
path2_valid = True
|
||
for x, y in path2:
|
||
if not (0 <= x < self.cols and 0 <= y < self.rows and self.map_data[y][x] != '1'):
|
||
path2_valid = False
|
||
break
|
||
|
||
return path1_valid or path2_valid
|
||
|
||
def add_path_to_map(self):
|
||
"""
|
||
将路径标记到地图上
|
||
|
||
Returns:
|
||
list: 标记了路径的地图
|
||
"""
|
||
marked_map = copy.deepcopy(self.original_map)
|
||
|
||
# 标记起点和终点
|
||
sx, sy = self.start
|
||
ex, ey = self.end
|
||
marked_map[sy][sx] = 'S'
|
||
marked_map[ey][ex] = 'E'
|
||
|
||
# 标记路径
|
||
for i, (x, y) in enumerate(self.path):
|
||
# 跳过起点和终点
|
||
if (x, y) == self.start or (x, y) == self.end:
|
||
continue
|
||
|
||
# 检查是否是资源位置
|
||
cell = self.original_map[y][x]
|
||
if (x, y) in self.visited_resources:
|
||
marked_map[y][x] = '*' # 已收集资源
|
||
else:
|
||
# 只有不是特殊位置才标记为路径
|
||
if not (cell.startswith('g') or cell.startswith('t') or
|
||
cell.startswith('b') or cell.startswith('l')):
|
||
marked_map[y][x] = '.' # 路径
|
||
|
||
return marked_map
|
||
|
||
|
||
"""
|
||
传统的贪心路径寻找算法
|
||
"""
|
||
|
||
class GreedyPlayer:
|
||
"""
|
||
传统贪心路径寻找算法
|
||
"""
|
||
|
||
def __init__(self, map_data):
|
||
"""
|
||
初始化贪心玩家
|
||
|
||
Args:
|
||
map_data: 迷宫地图,2D列表 (注意:这里是[y][x]格式)
|
||
"""
|
||
self.map_data = copy.deepcopy(map_data)
|
||
self.rows = len(map_data)
|
||
self.cols = len(map_data[0]) if self.rows > 0 else 0
|
||
|
||
# 寻找起点和终点
|
||
self.start = None
|
||
self.end = None
|
||
for y in range(self.rows):
|
||
for x in range(self.cols):
|
||
cell_value = str(self.map_data[y][x]).lower()
|
||
if cell_value.startswith('s'):
|
||
self.start = (x, y)
|
||
elif cell_value.startswith('e'):
|
||
self.end = (x, y)
|
||
|
||
if not self.start or not self.end:
|
||
raise ValueError("无法找到起点或终点")
|
||
|
||
self.path = []
|
||
self.marked_map = copy.deepcopy(map_data)
|
||
self.total_reward = 0
|
||
|
||
def find_path(self):
|
||
"""
|
||
使用传统贪心算法寻找路径
|
||
"""
|
||
current = self.start
|
||
self.path = [current]
|
||
visited = set([current])
|
||
|
||
while current != self.end:
|
||
x, y = current
|
||
best_move = None
|
||
best_value = float('-inf')
|
||
|
||
# 检查四个方向
|
||
for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]:
|
||
nx, ny = x + dx, y + dy
|
||
|
||
if (0 <= nx < self.cols and 0 <= ny < self.rows and
|
||
str(self.map_data[ny][nx]) != '1' and
|
||
(nx, ny) not in visited):
|
||
|
||
# 计算移动价值
|
||
cell = str(self.map_data[ny][nx])
|
||
value = 0
|
||
|
||
# 终点最高价值
|
||
if cell.lower().startswith('e'):
|
||
value = float('inf')
|
||
# 金币为正价值
|
||
elif cell.lower().startswith('g'):
|
||
try:
|
||
value = int(cell[1:])
|
||
except ValueError:
|
||
value = 1
|
||
# 陷阱为负价值
|
||
elif cell.lower().startswith('t'):
|
||
try:
|
||
value = -int(cell[1:])
|
||
except ValueError:
|
||
value = -1
|
||
|
||
if value > best_value:
|
||
best_value = value
|
||
best_move = (nx, ny)
|
||
|
||
if best_move:
|
||
# 更新当前位置
|
||
current = best_move
|
||
self.path.append(current)
|
||
visited.add(current)
|
||
|
||
# 收集资源
|
||
x, y = current
|
||
cell = str(self.map_data[y][x])
|
||
if cell.lower().startswith('g'):
|
||
try:
|
||
self.total_reward += int(cell[1:])
|
||
except ValueError:
|
||
self.total_reward += 1
|
||
elif cell.lower().startswith('t'):
|
||
try:
|
||
self.total_reward -= int(cell[1:])
|
||
except ValueError:
|
||
self.total_reward -= 1
|
||
|
||
# 标记路径
|
||
if not cell.lower().startswith('e'):
|
||
self.marked_map[y][x] = '.'
|
||
else:
|
||
# 无法继续移动
|
||
break
|
||
|
||
return self.path
|
||
|
||
def get_total_reward(self):
|
||
"""
|
||
获取收集到的总奖励
|
||
"""
|
||
return self.total_reward
|
||
|
||
# 使用示例
|
||
def main():
|
||
obj = MazeGenerator(20,'demo.csv',name="龙脊峡谷迷宫")
|
||
obj.generate(
|
||
seed=123,
|
||
boss_count=2,
|
||
traps_range=(5, 10),
|
||
mechanisms_range=(3, 7),
|
||
skill_traps=8
|
||
)
|
||
obj.export_to_csv()
|
||
|
||
map_data = obj.read_csv()
|
||
see = MazeGenerator(1,filename ='demo.csv')
|
||
see.read_from_csv()
|
||
print("=== 原始迷宫 ===")
|
||
see.print_maze()
|
||
|
||
print("\\n" + "="*60)
|
||
print("使用传统贪心算法:")
|
||
print("="*60)
|
||
player = GreedyPlayer(map_data)
|
||
player.find_path()
|
||
see.maze = player.marked_map
|
||
see.print_maze()
|
||
print(f"传统贪心算法总收益: {player.get_total_reward()}")
|
||
|
||
print("\\n" + "="*60)
|
||
print("使用3x3视野贪心算法:")
|
||
print("="*60)
|
||
# 使用新的3x3视野算法
|
||
greedy_3x3 = Greedy3x3ResourceCollector(map_data)
|
||
result = greedy_3x3.run_3x3_greedy_collection()
|
||
|
||
# 显示结果
|
||
see.maze = greedy_3x3.add_path_to_map()
|
||
see.print_maze()
|
||
|
||
print(f"\\n3x3视野算法结果:")
|
||
print(f" 总移动步数: {result['total_moves']}")
|
||
print(f" 收集资源数量: {result['resources_count']}")
|
||
print(f" 资源总价值: {result['total_value']}")
|
||
print(f" 探索位置数: {result['explored_positions']}")
|
||
|
||
print("\\n收集的资源详情:")
|
||
for i, resource in enumerate(result['collected_resources'], 1):
|
||
print(f" {i}. 位置{resource['position']}: {resource['type']} (价值: {resource['value']})")
|
||
|
||
|
||
def demo_3x3_greedy():
|
||
"""演示3x3视野贪心算法"""
|
||
|
||
# 创建一个示例迷宫
|
||
demo_maze = [
|
||
['s', '0', 'g5', '1', 't3'],
|
||
['0', '1', '0', '0', 'g2'],
|
||
['g3', '0', '1', 't2', '0'],
|
||
['0', 't1', '0', '0', 'g4'],
|
||
['1', '0', 'g1', '0', 'e']
|
||
]
|
||
|
||
print("=== 3x3视野贪心算法演示 ===")
|
||
print("迷宫说明:")
|
||
print(" s: 起点, e: 终点")
|
||
print(" g数字: 金币资源 (正收益)")
|
||
print(" t数字: 陷阱资源 (负收益)")
|
||
print(" 0: 可通行路径, 1: 墙壁")
|
||
print("\\n原始迷宫:")
|
||
for row in demo_maze:
|
||
print(' '.join(f"{cell:>2}" for cell in row))
|
||
|
||
# 使用3x3视野贪心算法
|
||
collector = Greedy3x3ResourceCollector(demo_maze)
|
||
result = collector.run_3x3_greedy_collection()
|
||
|
||
# 显示标记后的迷宫
|
||
marked_maze = collector.add_path_to_map()
|
||
print("\\n标记路径后的迷宫:")
|
||
print("S: 起点, E: 终点, *: 已收集资源, .: 路径")
|
||
for row in marked_maze:
|
||
print(' '.join(f"{cell:>2}" for cell in row))
|
||
|
||
print(f"\\n算法结果:")
|
||
print(f" 总移动步数: {result['total_moves']}")
|
||
print(f" 收集资源数量: {result['resources_count']}")
|
||
print(f" 资源总价值: {result['total_value']}")
|
||
|
||
return collector, result
|
||
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 运行演示
|
||
print("选择运行模式:")
|
||
print("1. 完整迷宫生成和算法比较")
|
||
print("2. 简单3x3视野算法演示")
|
||
|
||
choice = input("请输入选择 (1 或 2,默认为 2): ").strip()
|
||
|
||
if choice == "1":
|
||
main()
|
||
else:
|
||
demo_3x3_greedy()
|
||
|
||
|
||
|
||
|