From d1548f32818463b15691594d520300cb553302f6 Mon Sep 17 00:00:00 2001 From: Gary Gan Date: Thu, 3 Jul 2025 19:26:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=B4=AA=E5=BF=83=E7=AE=97?= =?UTF-8?q?=E6=B3=952?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- maze.py | 34 ++- strict_3x3_greedy.py | 337 ---------------------- tanxin.py | 651 +++++++++++++++++++++++++------------------ 3 files changed, 414 insertions(+), 608 deletions(-) delete mode 100644 strict_3x3_greedy.py diff --git a/maze.py b/maze.py index 3c78088..315b312 100644 --- a/maze.py +++ b/maze.py @@ -441,15 +441,39 @@ class Maze: return False try: + # 使用tanxin.py中的Greedy3x3ResourceCollector类(已添加死胡同检测和回溯功能) + from tanxin import Greedy3x3ResourceCollector + # 创建贪心算法实例 - algorithm = Greedy3x3Algorithm(self.grid, debug=True) + collector = Greedy3x3ResourceCollector(self.grid) # 运行算法 - result = algorithm.run() + result = collector.run_3x3_greedy_collection() # 将结果转换为路径格式 (y, x) - self.greedy_path = result['path_yx_format'] - self.greedy_result = result + # 注意:tanxin.py中的路径是(x, y)格式,而maze.py中使用(y, x)格式 + self.greedy_path = [(y, x) for (x, y) in result['path']] + + # 转换收集资源格式 + resources = [] + for resource in result['collected_resources']: + x, y = resource['position'] + resources.append({ + 'position': (x, y), # 保持(x, y)格式以兼容_draw_greedy_path方法 + 'type': resource['type'], + 'value': resource['value'] + }) + + # 更新结果 + result_formatted = { + 'path_yx_format': self.greedy_path, + 'collected_resources': resources, + 'total_value': result['total_value'], + 'total_moves': result['total_moves'], + 'resources_count': result['resources_count'] + } + + self.greedy_result = result_formatted self.greedy_step = 0 self.is_greedy_path_complete = False @@ -463,6 +487,8 @@ class Maze: except Exception as e: print(f"贪心搜索失败: {e}") + import traceback + traceback.print_exc() return False def next_greedy_step(self): diff --git a/strict_3x3_greedy.py b/strict_3x3_greedy.py deleted file mode 100644 index 2036492..0000000 --- a/strict_3x3_greedy.py +++ /dev/null @@ -1,337 +0,0 @@ -import copy -from collections import deque - -class Strict3x3GreedyCollector: - """ - 严格的3x3视野贪心资源收集器 - 每次移动时只考虑3x3视野范围内的资源 - 如果视野内没有资源,则随机移动探索 - """ - - def __init__(self, maze, start_pos=None, end_pos=None): - """初始化收集器""" - self.original_maze = copy.deepcopy(maze) - self.maze = copy.deepcopy(maze) - self.rows = len(maze) - self.cols = len(maze[0]) if self.rows > 0 else 0 - - # 寻找起始位置和目标位置 - self.start_pos = start_pos or self._find_position('s') - self.end_pos = end_pos or self._find_position('e') - - if not self.start_pos: - raise ValueError("无法找到起始位置 's'") - if not self.end_pos: - raise ValueError("无法找到目标位置 'e'") - - self.current_pos = self.start_pos - self.path = [self.start_pos] - self.collected_resources = [] - self.total_value = 0 - self.visited_resources = set() - self.explored_positions = set([self.start_pos]) - - print(f"严格3x3视野模式") - print(f"起始位置: {self.start_pos}") - print(f"目标位置: {self.end_pos}") - - def _find_position(self, target): - """寻找地图中指定字符的位置""" - for i in range(self.rows): - for j in range(self.cols): - if self.maze[i][j].lower() == target.lower(): - return (i, j) - return None - - def get_3x3_vision(self, pos): - """获取以pos为中心的3x3视野范围内的所有单元格""" - row, col = pos - vision = {} - - # 遍历3x3范围 - for dr in range(-1, 2): - for dc in range(-1, 2): - new_row, new_col = row + dr, col + dc - - # 检查边界 - if 0 <= new_row < self.rows and 0 <= new_col < self.cols: - vision[(new_row, new_col)] = self.maze[new_row][new_col] - - return vision - - def get_adjacent_cells(self, pos): - """获取当前位置的上下左右四个相邻位置""" - row, col = pos - adjacent = [] - - # 上下左右四个方向 - directions = [(-1, 0), (1, 0), (0, -1), (0, 1)] - - for dr, dc in directions: - new_row, new_col = row + dr, col + dc - - # 检查边界和可移动性 - if (0 <= new_row < self.rows and - 0 <= new_col < self.cols and - self.can_move_to((new_row, new_col))): - adjacent.append((new_row, new_col)) - - return adjacent - - def can_move_to(self, pos): - """检查是否可以移动到指定位置""" - row, col = pos - cell = self.maze[row][col] - # 不能移动到墙壁 - return cell != '1' - - def evaluate_resource_value(self, cell): - """评估资源的价值""" - if cell.startswith('g'): - try: - return int(cell[1:]) - except ValueError: - return 0 - elif cell.startswith('t'): - try: - return -int(cell[1:]) - except ValueError: - return 0 - else: - return 0 - - def find_best_resource_in_3x3_vision(self): - """ - 严格在3x3视野范围内找到价值最高的资源 - - Returns: - tuple: (最佳资源位置, 资源价值) 或 (None, 0) - """ - vision = self.get_3x3_vision(self.current_pos) - - best_pos = None - best_value = float('-inf') - - for pos, cell in vision.items(): - # 跳过已访问的资源 - if pos in self.visited_resources: - continue - - # 跳过当前位置 - if pos == self.current_pos: - continue - - # 跳过不可移动的位置 - if not self.can_move_to(pos): - continue - - # 检查是否可以直接到达(相邻位置) - if pos not in self.get_adjacent_cells(self.current_pos): - continue - - # 检查是否为资源 - value = self.evaluate_resource_value(cell) - if value != 0 and value > best_value: - best_value = value - best_pos = pos - - return best_pos, best_value if best_pos else 0 - - def find_exploration_target(self): - """ - 当视野内没有资源时,寻找探索目标 - 优先选择未探索过的位置 - """ - adjacent = self.get_adjacent_cells(self.current_pos) - - # 优先选择未探索的位置 - unexplored = [pos for pos in adjacent if pos not in self.explored_positions] - if unexplored: - return unexplored[0] # 选择第一个未探索的位置 - - # 如果所有相邻位置都探索过,选择任意一个 - if adjacent: - return adjacent[0] - - return None - - def collect_resource(self, pos): - """收集指定位置的资源""" - row, col = pos - cell = self.maze[row][col] - value = self.evaluate_resource_value(cell) - - if value != 0: - self.collected_resources.append({ - 'position': pos, - 'type': cell, - 'value': value - }) - self.total_value += value - self.visited_resources.add(pos) - - print(f"收集资源: 位置{pos}, 类型{cell}, 价值{value}, 总价值{self.total_value}") - - def run_strict_3x3_collection(self, max_moves=1000): - """ - 运行严格3x3视野贪心资源收集算法 - - Args: - max_moves: 最大移动步数,防止无限循环 - - Returns: - dict: 包含路径、收集的资源等信息 - """ - print("\\n开始严格3x3视野贪心资源收集...") - - moves = 0 - stuck_count = 0 # 连续无法找到资源的次数 - max_stuck = 20 # 最大连续无资源次数 - - while moves < max_moves and stuck_count < max_stuck: - moves += 1 - - # 在3x3视野内寻找最佳资源 - best_resource_pos, best_value = self.find_best_resource_in_3x3_vision() - - if best_resource_pos is not None: - print(f"第{moves}步: 发现视野内资源 位置{best_resource_pos}, 价值{best_value}") - - # 移动到资源位置并收集 - self.current_pos = best_resource_pos - self.path.append(best_resource_pos) - self.explored_positions.add(best_resource_pos) - self.collect_resource(best_resource_pos) - - stuck_count = 0 # 重置无资源计数 - else: - # 视野内没有资源,进行探索性移动 - exploration_target = self.find_exploration_target() - - if exploration_target: - print(f"第{moves}步: 视野内无资源,探索移动到 {exploration_target}") - self.current_pos = exploration_target - self.path.append(exploration_target) - self.explored_positions.add(exploration_target) - stuck_count += 1 - else: - print(f"第{moves}步: 无法进行任何移动,结束收集") - break - - if moves >= max_moves: - print(f"达到最大移动步数 {max_moves},结束收集") - elif stuck_count >= max_stuck: - print(f"连续 {max_stuck} 步未找到资源,结束收集") - - print("严格3x3视野资源收集完成!") - return self.get_collection_result() - - def get_collection_result(self): - """获取收集结果""" - return { - 'path': self.path.copy(), - 'collected_resources': self.collected_resources.copy(), - 'total_value': self.total_value, - 'total_moves': len(self.path) - 1, - 'resources_count': len(self.collected_resources), - 'start_pos': self.start_pos, - 'end_pos': self.end_pos, - 'final_pos': self.current_pos, - 'explored_positions': len(self.explored_positions) - } - - def print_result_summary(self): - """打印收集结果摘要""" - result = self.get_collection_result() - - print("\\n=== 严格3x3视野贪心收集结果摘要 ===") - print(f"起始位置: {result['start_pos']}") - print(f"最终位置: {result['final_pos']}") - print(f"总移动步数: {result['total_moves']}") - print(f"探索位置数: {result['explored_positions']}") - print(f"收集资源数量: {result['resources_count']}") - print(f"资源总价值: {result['total_value']}") - - print("\\n收集的资源详情:") - for i, resource in enumerate(result['collected_resources'], 1): - print(f" {i}. 位置{resource['position']}: {resource['type']} (价值: {resource['value']})") - - # 显示路径的关键点 - path_points = result['path'] - if len(path_points) <= 10: - path_str = ' -> '.join(map(str, path_points)) - else: - path_str = f"{path_points[0]} -> ... -> {path_points[-1]} (共{len(path_points)}个位置)" - print(f"\\n移动路径: {path_str}") - - def visualize_path_on_maze(self): - """在迷宫上可视化移动路径""" - visual_maze = copy.deepcopy(self.original_maze) - - # 标记路径 - for i, pos in enumerate(self.path): - row, col = pos - if pos == self.start_pos: - visual_maze[row][col] = 'S' # 起点 - elif pos in [r['position'] for r in self.collected_resources]: - # 已收集的资源位置 - visual_maze[row][col] = '*' - elif i == len(self.path) - 1: - # 最终位置 - visual_maze[row][col] = 'F' - else: - # 路径点 - visual_maze[row][col] = '.' - - return visual_maze - - def print_visual_maze(self): - """打印可视化的迷宫""" - visual_maze = self.visualize_path_on_maze() - - print("\\n=== 严格3x3视野路径可视化迷宫 ===") - print("S: 起点, F: 终点, *: 已收集资源, .: 路径") - for row in visual_maze: - print(' '.join(f"{cell:>2}" for cell in row)) - - -def compare_algorithms(): - """比较不同算法的效果""" - - # 创建一个更大的示例迷宫 - demo_maze = [ - ['s', '0', 'g5', '1', 't3', '0', 'g8'], - ['0', '1', '0', '0', 'g2', '1', '0'], - ['g3', '0', '1', 't2', '0', '0', 'g6'], - ['0', 't1', '0', '0', 'g4', '1', '0'], - ['1', '0', 'g1', '0', '0', '0', 't5'], - ['0', 'g7', '0', '1', '0', 'g9', '0'], - ['t4', '0', '0', '0', '1', '0', 'e'] - ] - - print("=== 算法比较演示 ===") - print("迷宫说明:") - print(" s: 起点, e: 终点") - print(" g数字: 金币资源 (正收益)") - print(" t数字: 陷阱资源 (负收益)") - print(" 0: 可通行路径, 1: 墙壁") - print("\\n原始迷宫:") - for row in demo_maze: - print(' '.join(f"{cell:>2}" for cell in row)) - - print("\\n" + "="*60) - print("严格3x3视野贪心算法:") - print("="*60) - - # 运行严格3x3视野算法 - strict_collector = Strict3x3GreedyCollector(demo_maze) - strict_result = strict_collector.run_strict_3x3_collection() - strict_collector.print_result_summary() - strict_collector.print_visual_maze() - - return strict_collector, strict_result - - -if __name__ == "__main__": - # 运行比较演示 - strict_collector, strict_result = compare_algorithms() diff --git a/tanxin.py b/tanxin.py index b9d3605..056c0ff 100644 --- a/tanxin.py +++ b/tanxin.py @@ -5,173 +5,6 @@ import copy from collections import deque -class GreedyPlayer: - def __init__(self, map_data, start=None, end=None): - """初始化GreedyPlayer对象""" - self.map_data = map_data - self.rows = len(map_data) - self.cols = len(map_data[0]) if self.rows > 0 else 0 - self.start = start - self.end = end - self.path = [] - self.total_reward = 0 - self.visited = set() - self.marked_map = [] - - # 如果未指定起点和终点,自动查找 - if not self.start or not self.end: - self._find_start_end() - - def _find_start_end(self): - """自动查找地图中的起点(s)和终点(e)""" - for y in range(self.rows): - for x in range(self.cols): - if self.map_data[y][x] == 's' or self.map_data[y][x] == 'S': - self.start = (x, y) - elif self.map_data[y][x] == 'e' or self.map_data[y][x] == 'E': - self.end = (x, y) - print(f"起点: {self.start}, 终点: {self.end}") - - def get_visible_cells(self, x, y, visibility=1): - """获取以(x,y)为中心的上下左右四个方向的单元格信息""" - visible = {} - # 只考虑上下左右四个方向(dx或dy为±1,另一个为0) - directions = [(-1, 0), (1, 0), (0, -1), (0, 1)] - for dx, dy in directions: - nx, ny = x + dx, y + dy - if 0 <= nx < self.cols and 0 <= ny < self.rows: - cell = self.map_data[ny][nx] - distance = 1 # 上下左右移动距离为1 - visible[(nx, ny)] = (cell, distance) - return visible - - def evaluate_cell(self, cell, distance): - """评估单元格的价值,返回奖励/路径的比值""" - if cell == 's' or cell == 'e': - return 0 # 起点和终点不参与资源评估 - - if cell.startswith('t'): - try: - value = -int(cell[1:]) # t表示损失,转为负值 - return value / distance - except ValueError: - return 0 - elif cell.startswith('g'): - try: - value = int(cell[1:]) # g表示收益,转为正值 - return value / distance - except ValueError: - return 0 - - return 0 # 0、l、b等不产生资源价值 - - def find_path(self): - """基于贪心策略的路径规划(只能上下左右移动)""" - if not self.start or not self.end: - raise ValueError("地图中未找到起点或终点") - - current = self.start - self.path = [current] - self.visited = {current} - self.total_reward = 0 - - while current != self.end: - x, y = current - visible = self.get_visible_cells(x, y) - - best_cell = None - best_value = -float('inf') - - for (nx, ny), (cell, distance) in visible.items(): - # 跳过已访问的位置 - if (nx, ny) in self.visited: - continue - - # 只允许在0、t、g、l、b上行走 - if cell not in ['0'] and not cell.startswith(('t', 'g', 'l', 'b')): - continue - - # 评估单元格价值 - value = self.evaluate_cell(cell, distance) - - # 终点具有最高优先级 - if cell == 'e': - value = float('inf') - - # 选择贪心值最大的单元格 - if value > best_value: - best_value = value - best_cell = (nx, ny) - - # 无法找到可行路径 - if best_cell is None: - print("无法找到通往终点的路径!") - break - - # 更新当前位置和路径 - current = best_cell - self.path.append(current) - self.visited.add(current) - - # 更新总收益(跳过起点和终点) - if len(self.path) > 1 and len(self.path) < len(self.path) + 1: - cell = self.map_data[current[1]][current[0]] - if cell.startswith('t'): - self.total_reward -= int(cell[1:]) - elif cell.startswith('g'): - self.total_reward += int(cell[1:]) - self.add_path_to_map() - return self.path - - def add_path_to_map(self): - """在地图上标记路径,上下移动用|,左右移动用-""" - if not self.path: - print("没有路径可标记") - return - - # 创建地图副本,避免修改原始地图 - marked_map = [row.copy() for row in self.map_data] - - # 标记路径点 - for i, (x, y) in enumerate(self.path): - if marked_map[y][x] == 's': - marked_map[y][x] = 'S' # 标记起点 - elif marked_map[y][x] == 'e': - marked_map[y][x] = 'E' # 标记终点 - else: - marked_map[y][x] = '*' # 标记路径点 - - # 标记路径线(上下左右) - for i in range(len(self.path) - 1): - x1, y1 = self.path[i] - x2, y2 = self.path[i + 1] - - # 左右移动 - if x1 != x2 and y1 == y2: - start, end = (x1, x2) if x1 < x2 else (x2, x1) - for x in range(start, end + 1): - if marked_map[y1][x] not in ['S', 'E']: - marked_map[y1][x] = '-' - - # 上下移动 - elif y1 != y2 and x1 == x2: - start, end = (y1, y2) if y1 < y2 else (y2, y1) - for y in range(start, end + 1): - if marked_map[y][x1] not in ['S', 'E']: - marked_map[y][x1] = '|' - - # 保存标记后的地图 - self.marked_map = marked_map - return marked_map - - def get_path(self): - """返回找到的路径""" - return self.path - - def get_total_reward(self): - """返回总收益""" - return self.total_reward - class Greedy3x3ResourceCollector: """ @@ -209,6 +42,13 @@ class Greedy3x3ResourceCollector: self.total_value = 0 self.visited_resources = set() self.explored_positions = set([self.start]) + + # 增加历史移动记录和死胡同检测相关变量 + self.position_visit_count = {self.start: 1} # 记录每个位置的访问次数 + self.deadend_positions = set() # 记录已知的死胡同位置 + self.backtrack_points = [] # 记录可能的回溯点 + self.oscillation_detection = [] # 用于检测来回走动的历史 + self.max_oscillation_length = 6 # 检测来回走动的最大长度 print(f"3x3视野贪心算法初始化") print(f"起始位置: {self.start}") @@ -320,73 +160,171 @@ class Greedy3x3ResourceCollector: def find_best_resource_in_3x3_vision(self): """ - 在3x3视野范围内找到价值最高的可到达资源 - - Returns: - tuple: (最佳资源位置, 资源价值) 或 (None, 0) + 在3x3视野内寻找最佳资源 + 优先级:金币 > 未走过 > 走过的路(优先很久之前走过的路) > 墙/陷阱 + 加入死胡同检测和回溯机制 """ - vision = self.get_3x3_vision(self.current_pos) - + x, y = self.current_pos best_pos = None best_value = float('-inf') - - # 首先尝试找正价值资源 - for pos, cell in vision.items(): - # 跳过已访问的资源 - if pos in self.visited_resources: - continue - - # 跳过当前位置 - if pos == self.current_pos: - continue - - # 跳过不可移动的位置 - if not self.can_move_to(pos): - continue - - # 检查是否可以直接到达(相邻位置) - if pos not in self.get_adjacent_cells(self.current_pos): - continue - - # 检查是否为资源 - value = self.evaluate_resource_value(cell) - if value > 0 and value > best_value: # 优先选择正价值资源 - best_value = value - best_pos = pos - - # 如果没有正价值资源,考虑负价值资源(选择损失最小的) + best_visited_time = float('inf') + + # 更新当前位置的访问次数 + self.position_visit_count[self.current_pos] = self.position_visit_count.get(self.current_pos, 0) + 1 + + # 检查是否处于死胡同中 + if self.is_deadend(self.current_pos): + self.deadend_positions.add(self.current_pos) + # 寻找回溯点 + backtrack_point = self.find_backtrack_point() + if backtrack_point != self.current_pos: + # 将当前位置到回溯点的路径添加到路径计划中 + self.backtrack_points.append(backtrack_point) + print(f"检测到死胡同,计划回溯到: {backtrack_point}") + # 如果回溯点是相邻的,直接返回 + if abs(backtrack_point[0] - x) + abs(backtrack_point[1] - y) == 1: + return backtrack_point, 0 # 回溯点,价值为0 + + # 如果有待回溯的点,优先选择那个方向 + if self.backtrack_points: + target = self.backtrack_points[-1] + # 计算到回溯点的方向 + for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]: + nx, ny = x + dx, y + dy + if (nx, ny) == target: + return (nx, ny), 0 # 回溯点,价值为0 + + # 如果相邻点在路径上且朝向回溯点方向,也可以选择 + if (0 <= nx < self.cols and 0 <= ny < self.rows and + self.map_data[ny][nx] != '1'): # 使用'1'表示墙壁 + if ((nx > x and target[0] > x) or + (nx < x and target[0] < x) or + (ny > y and target[1] > y) or + (ny < y and target[1] < y)): + return (nx, ny), 0 # 朝向回溯点的方向,价值为0 + + # 如果已经到达回溯点或无法向回溯点移动,弹出这个回溯点 + if self.current_pos == self.backtrack_points[-1]: + self.backtrack_points.pop() + + # 检测是否陷入来回走动的循环 + if len(self.path) >= 2: + self.oscillation_detection.append(self.current_pos) + if len(self.oscillation_detection) > self.max_oscillation_length: + self.oscillation_detection.pop(0) + + if self.detect_oscillation(): + print("检测到来回走动,尝试打破循环") + # 清空回溯点列表,寻找新的探索方向 + self.backtrack_points = [] + # 尝试找到访问次数最少的相邻位置 + min_visits = float('inf') + least_visited = None + + for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]: + nx, ny = x + dx, y + dy + if (0 <= nx < self.cols and 0 <= ny < self.rows and + self.map_data[ny][nx] != '1'): # 使用'1'表示墙壁 + visits = self.position_visit_count.get((nx, ny), 0) + if visits < min_visits: + min_visits = visits + least_visited = (nx, ny) + + if least_visited: + return least_visited, 0 # 访问次数最少的位置,价值为0 + + # 在3x3视野内寻找最佳位置 + for i in range(-1, 2): + for j in range(-1, 2): + # 跳过自身和对角线位置 + if (i == 0 and j == 0) or (i != 0 and j != 0): + continue + + nx, ny = x + i, y + j + + # 检查位置是否在地图范围内 + if 0 <= nx < self.cols and 0 <= ny < self.rows: + cell = self.map_data[ny][nx] + pos = (nx, ny) + + # 检查是否是墙,不能走 + if cell == '1': + continue + + # 计算资源价值 + value = self.evaluate_resource_value(cell) + + # 检查是否已经走过这个位置 + is_visited = pos in self.explored_positions + visited_time = self.position_visit_count.get(pos, 0) + + # 计算探索潜力 + exploration_potential = self.calculate_exploration_potential(pos) + + # 优先级计算逻辑 + # 1. 金币优先 + if value > 0: + if (value > best_value or + (value == best_value and + ((not is_visited and best_visited_time > 0) or + (is_visited and visited_time < best_visited_time)))): + best_value = value + best_pos = pos + best_visited_time = visited_time if is_visited else 0 + # 2. 没有金币,选择未走过的路 + elif not is_visited: + if best_value <= 0 and (best_visited_time > 0 or exploration_potential > best_value): + best_value = exploration_potential + best_pos = pos + best_visited_time = 0 + # 3. 如果都走过了,选择走过次数最少的路 + elif is_visited and visited_time < best_visited_time: + if best_value <= 0: + best_value = -visited_time # 负值,访问次数越少越好 + best_pos = pos + best_visited_time = visited_time + + # 如果找不到合适的位置,就选择任意一个可行的相邻位置 if best_pos is None: - for pos, cell in vision.items(): - if pos in self.visited_resources or pos == self.current_pos: - continue - if not self.can_move_to(pos): - continue - if pos not in self.get_adjacent_cells(self.current_pos): - continue - - value = self.evaluate_resource_value(cell) - if value < 0 and value > best_value: # 选择损失最小的陷阱 - best_value = value - best_pos = pos - - return best_pos, best_value if best_pos else 0 + for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]: + nx, ny = x + dx, y + dy + if (0 <= nx < self.cols and 0 <= ny < self.rows and + self.map_data[ny][nx] != '1'): # 使用'1'表示墙壁 + best_pos = (nx, ny) + break + + return best_pos, best_value if best_value > float('-inf') else 0 def find_exploration_target(self): """ 当视野内没有资源时,寻找探索目标 - 优先选择未探索过的位置 + 严格按照优先级:未走过的路 > 走过的路(很久之前走过的优先) """ adjacent = self.get_adjacent_cells(self.current_pos) - - # 优先选择未探索的位置 + + # 1. 优先级1:未走过的路 unexplored = [pos for pos in adjacent if pos not in self.explored_positions] if unexplored: return unexplored[0] # 选择第一个未探索的位置 - - # 如果所有相邻位置都探索过,选择任意一个 - if adjacent: - return adjacent[0] - + + # 2. 优先级2:走过的路,按时间排序(很久之前走过的优先) + explored = [] + for pos in adjacent: + if pos in self.explored_positions: + # 找出这个位置在路径中最早出现的索引 + if pos in self.path: + earliest_index = self.path.index(pos) + explored.append((pos, earliest_index)) + else: + # 如果在explored_positions但不在path中,可能是通过其他方式标记的 + # 给它一个很大的索引,表示是最近才探索的 + explored.append((pos, float('inf'))) + + if explored: + # 按照索引排序,索引越小表示越早走过 + explored.sort(key=lambda x: x[1]) + return explored[0][0] + return None def collect_resource(self, pos): @@ -414,6 +352,8 @@ class Greedy3x3ResourceCollector: def run_3x3_greedy_collection(self, max_moves=1000): """ 运行3x3视野贪心资源收集算法 + 严格按照优先级:金币 > 未走过的路 > 走过的路 > 墙/陷阱 + 对于走过的路,优先走很久之前走过的路 Args: max_moves: 最大移动步数,防止无限循环 @@ -430,32 +370,36 @@ class Greedy3x3ResourceCollector: while moves < max_moves and stuck_count < max_stuck: moves += 1 - # 在3x3视野内寻找最佳资源 - best_resource_pos, best_value = self.find_best_resource_in_3x3_vision() + # 在3x3视野内寻找最佳位置(按照严格优先级) + best_pos, best_value = self.find_best_resource_in_3x3_vision() - if best_resource_pos is not None: - print(f"第{moves}步: 发现视野内资源 位置{best_resource_pos}, 价值{best_value}") - - # 移动到资源位置并收集 - self.current_pos = best_resource_pos - self.path.append(best_resource_pos) - self.explored_positions.add(best_resource_pos) - self.collect_resource(best_resource_pos) - - stuck_count = 0 # 重置无资源计数 - else: - # 视野内没有资源,进行探索性移动 - exploration_target = self.find_exploration_target() - - if exploration_target: - print(f"第{moves}步: 视野内无资源,探索移动到 {exploration_target}") - self.current_pos = exploration_target - self.path.append(exploration_target) - self.explored_positions.add(exploration_target) - stuck_count += 1 + if best_pos is not None: + # 移动到选定位置 + self.current_pos = best_pos + self.path.append(best_pos) + self.explored_positions.add(best_pos) + + # 如果是资源位置,进行收集 + if best_value != 0: + print(f"第{moves}步: 发现视野内金币 位置{best_pos}, 价值{best_value}") + self.collect_resource(best_pos) + stuck_count = 0 # 收集到资源后重置无资源计数 else: - print(f"第{moves}步: 无法进行任何移动,结束收集") - break + # 是普通路径 + if best_pos not in self.explored_positions: + print(f"第{moves}步: 移动到未走过的路 位置{best_pos}") + else: + print(f"第{moves}步: 移动到走过的路 位置{best_pos}") + stuck_count += 1 + else: + # 没有可移动位置,结束收集 + print(f"第{moves}步: 无法进行任何移动,结束收集") + break + + # 检查是否达到终点 + if self.current_pos == self.end: + print(f"第{moves}步: 到达终点!") + break if moves >= max_moves: print(f"达到最大移动步数 {max_moves},结束收集") @@ -463,6 +407,7 @@ class Greedy3x3ResourceCollector: print(f"连续 {max_stuck} 步未找到资源,结束收集") print("3x3视野资源收集完成!") + print(f"总步数: {len(self.path)-1}, 收集资源数: {len(self.collected_resources)}, 资源总价值: {self.total_value}") return self.get_collection_result() def get_collection_result(self): @@ -479,32 +424,204 @@ class Greedy3x3ResourceCollector: 'explored_positions': len(self.explored_positions) } + def reset(self): + """重置收集器状态""" + self.map_data = copy.deepcopy(self.original_map) + self.current_pos = self.start + self.path = [self.start] + self.collected_resources = [] + self.total_value = 0 + self.visited_resources = set() + self.explored_positions = set([self.start]) + self.position_visit_count = {self.start: 1} + self.deadend_positions = set() + self.backtrack_points = [] + self.oscillation_detection = [] + def get_path(self): - """返回路径,转换为(y, x)格式以兼容现有代码""" - # 将(x, y)格式的路径转换为(y, x)格式 - return [(y, x) for (x, y) in self.path] + """ + 获取完整的资源收集路径 + 返回:路径列表,格式为 [(x1, y1), (x2, y2), ...] + """ + # 先重置状态 + self.reset() + + max_steps = self.rows * self.cols * 3 # 设置最大步数限制,避免无限循环 + steps = 0 + reached_goal = False + + while steps < max_steps and not reached_goal: + _, _, reached_goal = self.next_step() + steps += 1 + + # 如果路径长度已经很长但还没到达目标,可能是在循环 + if steps > self.rows * self.cols * 2: + print(f"警告:路径过长 ({steps} 步),可能存在循环。提前结束。") + break + + if reached_goal: + print(f"找到路径!总步数: {steps}, 总收集价值: {self.total_value}") + else: + print(f"未能找到到达目标的路径,已走 {steps} 步,总收集价值: {self.total_value}") + + print(f"发现的死胡同数量: {len(self.deadend_positions)}") + + return self.path - def get_total_reward(self): - """返回总收益""" - return self.total_value + def next_step(self): + """ + 执行下一步移动 + 返回:(新位置, 收集的资源价值, 是否到达目标) + """ + if self.current_pos == self.end: + return self.current_pos, 0, True + + next_pos, value = self.find_best_resource_in_3x3_vision() + if next_pos is None: + # 如果找不到下一步,说明卡住了,可能是迷宫设计问题 + print("找不到下一步移动,可能被卡住了") + return self.current_pos, 0, False + + # 记录新位置和路径 + self.current_pos = next_pos + self.path.append(next_pos) + self.explored_positions.add(next_pos) + + # 更新位置访问计数 + self.position_visit_count[next_pos] = self.position_visit_count.get(next_pos, 0) + 1 + + # 如果当前位置是回溯点且有多个回溯点,移除当前回溯点 + if self.backtrack_points and next_pos == self.backtrack_points[-1]: + self.backtrack_points.pop() + + # 收集资源 + x, y = next_pos + cell = self.map_data[y][x] + value = self.evaluate_resource_value(cell) + + if value > 0 and next_pos not in self.visited_resources: + self.collected_resources.append((next_pos, value)) + self.visited_resources.add(next_pos) + self.total_value += value + # 标记资源已被收集,避免重复计算 + if cell.startswith('g') or cell.startswith('c'): + try: + self.map_data[y][x] = 'v' # 将收集过的资源标记为已访问 + except: + pass + + # 检查是否到达目标 + reached_goal = (next_pos == self.end) + + # 调试信息 + if len(self.path) % 10 == 0: + print(f"当前路径长度: {len(self.path)}, 总收集价值: {self.total_value}") + print(f"已发现的死胡同数量: {len(self.deadend_positions)}") + + return next_pos, value, reached_goal - def add_path_to_map(self): - """在地图上标记路径""" - marked_map = [row.copy() for row in self.map_data] - - # 标记路径点 - for i, (x, y) in enumerate(self.path): - if marked_map[y][x] == 's': - marked_map[y][x] = 'S' # 标记起点 - elif marked_map[y][x] == 'e': - marked_map[y][x] = 'E' # 标记终点 - elif (x, y) in [r['position'] for r in self.collected_resources]: - marked_map[y][x] = '*' # 标记已收集资源 + def is_deadend(self, pos): + """ + 判断当前位置是否是死胡同 + 死胡同的定义:除了来路外,周围全是墙/陷阱/已走过的路 + """ + x, y = pos + valid_directions = 0 + + for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]: + nx, ny = x + dx, y + dy + if (0 <= nx < self.cols and 0 <= ny < self.rows and + self.map_data[ny][nx] != '1' and # 使用'1'表示墙壁 + (nx, ny) not in self.explored_positions): + valid_directions += 1 + + # 如果没有未探索的方向,则是死胡同 + return valid_directions == 0 + + def find_backtrack_point(self): + """ + 寻找回溯点,即从路径中找到最近的有未探索方向的点 + """ + # 从最近访问到最早访问的路径点遍历 + for pos in reversed(self.path): + x, y = pos + + # 检查这个点的四个方向是否有未探索的路 + for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]: + nx, ny = x + dx, y + dy + if (0 <= nx < self.cols and 0 <= ny < self.rows and + self.map_data[ny][nx] != '1' and # 使用'1'表示墙壁 + (nx, ny) not in self.explored_positions): + return pos + + # 如果找不到回溯点,则返回起始点 + return self.start + + def detect_oscillation(self): + """ + 检测路径中是否有来回走动的情况 + """ + if len(self.oscillation_detection) < self.max_oscillation_length: + return False + + # 检查最近的移动是否形成循环 + recent_moves = self.oscillation_detection[-self.max_oscillation_length:] + + # 打印调试信息 + print(f"检查振荡: {recent_moves[-6:]}") + + # 检查是否有重复位置模式 (例如A-B-A-B或A-B-C-A-B-C) + for pattern_length in range(2, self.max_oscillation_length // 2 + 1): + if recent_moves[-pattern_length:] == recent_moves[-2*pattern_length:-pattern_length]: + print(f"检测到振荡!模式长度: {pattern_length}") + return True + + # 更简单的检测:检查是否在有限步数内多次访问同一位置 + position_counts = {} + for pos in recent_moves: + if pos in position_counts: + position_counts[pos] += 1 + if position_counts[pos] >= 3: # 在短时间内访问同一位置3次以上 + print(f"检测到位置 {pos} 被频繁访问 {position_counts[pos]} 次") + return True else: - marked_map[y][x] = '.' # 标记路径点 - - self.marked_map = marked_map - return marked_map + position_counts[pos] = 1 + + return False + + def calculate_exploration_potential(self, pos): + """ + 计算位置的探索潜力值 + 潜力值基于: + 1. 周围未探索的方向数 + 2. 到达过这个位置的次数(次数越多潜力越低) + 3. 是否含有资源 + """ + x, y = pos + potential = 0 + + # 检查周围四个方向是否有未探索的路 + for dx, dy in [(0, 1), (1, 0), (0, -1), (-1, 0)]: + nx, ny = x + dx, y + dy + if (0 <= nx < self.cols and 0 <= ny < self.rows): + # 未探索的路增加潜力 + if (nx, ny) not in self.explored_positions and self.map_data[ny][nx] != '1': + potential += 10 + + # 有资源的路增加更多潜力 + cell = self.map_data[ny][nx] + if cell.startswith('g'): + try: + value = int(cell[1:]) + potential += value * 2 + except ValueError: + potential += 5 # 如果无法解析值,则默认增加5点潜力 + + # 访问次数越多,潜力越低 + visit_penalty = self.position_visit_count.get(pos, 0) * 5 + potential = max(0, potential - visit_penalty) + + return potential # 使用示例