from collections import deque import copy class GreedyCollector: def __init__(self, filename=None, maze=None): self.filename = filename self.maze = [] self.path = [] self.start_pos = None self.end_pos = None self.visited = set() self.rowNums = 0 self.colNums = 0 # 添加GUI相关的属性 self.collected_resources = [] # 收集的资源详情 self.current_step = 0 # 当前步数 self.is_running = False # 是否正在运行 self.total_reward = 0 self.marked_map = [] if filename: import csv with open(filename, 'r') as f: reader = csv.reader(f) for idx, row in enumerate(reader): self.maze.append(list(row)) elif maze: self.maze = copy.deepcopy(maze) if self.maze: self._initialize_maze() def _initialize_maze(self): """初始化迷宫相关数据""" self.rowNums = len(self.maze) self.colNums = len(self.maze[0]) if self.rowNums > 0 else 0 # 查找起点和终点 self.start_pos = None self.end_pos = None for i in range(self.rowNums): for j in range(self.colNums): if self.maze[i][j] == 's': self.start_pos = (j, i) elif self.maze[i][j] == 'e': self.end_pos = (j, i) # 从终点开始BFS预处理到终点的距离 self.dist_to_end = self.bfs_from_end() self.marked_map = copy.deepcopy(self.maze) def bfs_from_end(self): """从终点开始BFS预处理每个点到终点的距离""" dist = {} if not self.end_pos: return dist queue = deque([(self.end_pos[1], self.end_pos[0], 0)]) visited = set() while queue: row, col, d = queue.popleft() if (col, row) in visited: continue visited.add((col, row)) dist[(col, row)] = d # 上下左右四个方向 for dr, dc in [(-1,0), (1,0), (0,-1), (0,1)]: nr, nc = row + dr, col + dc if (0 <= nr < self.rowNums and 0 <= nc < self.colNums and self.maze[nr][nc] != '1' and (nc, nr) not in visited): queue.append((nr, nc, d + 1)) return dist def can_reach_diagonal(self, x, y, dx, dy): """检查是否可以通过两步L形路径到达对角线位置""" target_x, target_y = x + dx, y + dy # 检查目标位置是否在迷宫范围内且不是墙壁 if (not (0 <= target_y < self.rowNums and 0 <= target_x < self.colNums) or self.maze[target_y][target_x] == '1'): return False # 两种可能的L形路径 # 路径1: 先水平移动(x, y) -> (target_x, y) -> (target_x, target_y) step1_h = (target_x, y) # 路径2: 先垂直移动(x, y) -> (x, target_y) -> (target_x, target_y) step1_v = (x, target_y) path1_valid = (0 <= step1_h[1] < self.rowNums and 0 <= step1_h[0] < self.colNums and self.maze[step1_h[1]][step1_h[0]] != '1') path2_valid = (0 <= step1_v[1] < self.rowNums and 0 <= step1_v[0] < self.colNums and self.maze[step1_v[1]][step1_v[0]] != '1') return path1_valid or path2_valid def get_diagonal_first_step(self, x, y, dx, dy): """获取到达对角线位置的第一步移动""" target_x, target_y = x + dx, y + dy # 两种可能的第一步 step1_h = (target_x, y) # 先水平移动 step1_v = (x, target_y) # 先垂直移动 valid_steps = [] # 检查水平路径 if (0 <= step1_h[1] < self.rowNums and 0 <= step1_h[0] < self.colNums and self.maze[step1_h[1]][step1_h[0]] != '1'): valid_steps.append(step1_h) # 检查垂直路径 if (0 <= step1_v[1] < self.rowNums and 0 <= step1_v[0] < self.colNums and self.maze[step1_v[1]][step1_v[0]] != '1'): valid_steps.append(step1_v) if not valid_steps: return None # 如果两个路径都可行,选择更好的 if len(valid_steps) == 1: return valid_steps[0] # 比较两个第一步的价值 val1 = self.evaluate_position(step1_h[0], step1_h[1]) val2 = self.evaluate_position(step1_v[0], step1_v[1]) if val1 != val2: return step1_h if val1 > val2 else step1_v # 价值相同时选择距离终点更近的 dist1 = self.dist_to_end.get(step1_h, 9999) dist2 = self.dist_to_end.get(step1_v, 9999) return step1_h if dist1 <= dist2 else step1_v def evaluate_position(self, x, y): """评估位置的价值""" cell = self.maze[y][x] # 墙壁不能走 if cell == '1': return -10000 # 检查是否已经访问过 if (x, y) in self.visited: # 走过的空地为-1000 return -1000 # 检查是否已经收集/触发过资源 already_processed = any(r['position'] == (x, y) for r in self.collected_resources) if cell.startswith('g'): # 金币 if already_processed: return -1000 # 已收集过的金币 return 50 # 统一金币权重为50 elif cell.startswith('t'): # 陷阱 if already_processed: return -1000 # 已踩过的陷阱 return -30 # 统一陷阱权重为-30 elif cell.startswith('b') or cell.startswith('l'): # boss或机关 if already_processed: return -1000 # 已遭遇过的boss/机关 return -0.5 else: # 空地 return 0 # 未走过的空地为0 def get_position_priority(self, x, y): """获取位置的优先级,数字越大优先级越高""" cell = self.maze[y][x] # 墙壁不能走 if cell == '1': return 0 # 已访问过的位置优先级最低 if (x, y) in self.visited: return 1 # 检查是否已经处理过资源 already_processed = any(r['position'] == (x, y) for r in self.collected_resources) if cell.startswith('g') and not already_processed: # 未收集的金币 return 5 elif cell == '0' or cell == 's' or cell == 'e': # 空地 return 4 elif (cell.startswith('b') or cell.startswith('l')) and not already_processed: # 未遭遇的boss/机关 return 3 elif cell.startswith('t') and not already_processed: # 未触发的陷阱 return 2 else: # 已处理过的资源 return 1 def greedy_step(self, pos): """贪心策略选择下一步""" x, y = pos candidates = [] # 1. 考虑上下左右四个直接相邻的位置 for dx, dy in [(-1,0), (1,0), (0,-1), (0,1)]: nx, ny = x + dx, y + dy if (0 <= ny < self.rowNums and 0 <= nx < self.colNums and self.maze[ny][nx] != '1'): priority = self.get_position_priority(nx, ny) value = self.evaluate_position(nx, ny) dist = self.dist_to_end.get((nx, ny), 9999) candidates.append(((nx, ny), priority, value, dist, 'direct')) # 2. 考虑对角线位置(通过两步L形路径到达) for dx, dy in [(-1,-1), (-1,1), (1,-1), (1,1)]: # 四个对角线方向 if self.can_reach_diagonal(x, y, dx, dy): first_step = self.get_diagonal_first_step(x, y, dx, dy) if first_step and first_step not in [c[0] for c in candidates]: nx, ny = first_step priority = self.get_position_priority(nx, ny) value = self.evaluate_position(nx, ny) dist = self.dist_to_end.get((nx, ny), 9999) candidates.append(((nx, ny), priority, value, dist, 'diagonal')) # 过滤掉不能走的位置(墙壁和超出范围) candidates = [c for c in candidates if c[1] > 0] if not candidates: return None # 排序候选位置 def sort_key(item): pos, priority, value, dist, move_type = item # 按照优先级排序:金币 > 空地 > boss/机关 > 陷阱 # 同级别内部: # - 对于金币和陷阱:按价值排序(金币越大越好,陷阱损失越小越好) # - 对于空地:按距离终点排序(越近越好) # - 对于boss/机关:按距离终点排序(越近越好) if priority == 5: # 金币 return (priority, value, -dist) # 优先级最高,价值越大越好,距离越近越好 elif priority == 4: # 空地 return (priority, -dist, value) # 优先距离终点近的 elif priority == 3: # boss/机关 return (priority, -dist, value) # 优先距离终点近的 elif priority == 2: # 陷阱 return (priority, value, -dist) # 优先损失小的(value是负数,越大损失越小) else: # 其他情况 return (priority, -dist, value) candidates.sort(key=sort_key, reverse=True) return candidates[0][0] def run(self): """运行基本的贪心算法""" self.path = [] self.visited = set() self.total_reward = 0 self.collected_resources = [] if not self.start_pos or not self.end_pos: print("错误:无法找到起点或终点") return pos = self.start_pos self.path.append(pos) self.visited.add(pos) step_count = 0 max_steps = self.rowNums * self.colNums * 3 while pos != self.end_pos and step_count < max_steps: next_pos = self.greedy_step(pos) if not next_pos: print(f"步骤{step_count}: 无法找到下一步移动位置") break x, y = next_pos cell = self.maze[y][x] # 计算奖励并收集资源 reward = self._process_cell_reward(x, y, cell, step_count + 1) self.total_reward += reward self.path.append((x, y)) self.visited.add((x, y)) pos = (x, y) step_count += 1 # 更新标记地图 self._update_marked_map() def _process_cell_reward(self, x, y, cell, step_num): """处理格子奖励和资源收集""" reward = 0 if cell.startswith('g'): # 检查是否已经收集过这个金币 already_collected = any(r['position'] == (x, y) for r in self.collected_resources) if not already_collected: try: reward = int(cell[1:]) except: reward = 1 self.collected_resources.append({ 'position': (x, y), 'type': cell, 'value': reward }) print(f"第{step_num}步: 收集金币 位置({x},{y}), 价值+{reward}") else: print(f"第{step_num}步: 移动到已收集的金币位置({x},{y})") elif cell.startswith('t'): # 检查是否已经踩过这个陷阱 already_triggered = any(r['position'] == (x, y) for r in self.collected_resources) if not already_triggered: try: reward = -int(cell[1:]) except: reward = -1 self.collected_resources.append({ 'position': (x, y), 'type': cell, 'value': reward }) print(f"第{step_num}步: 触发陷阱 位置({x},{y}), 价值{reward}") else: print(f"第{step_num}步: 移动到已触发的陷阱位置({x},{y})") elif cell.startswith('b'): # 检查是否已经遭遇过这个boss already_encountered = any(r['position'] == (x, y) for r in self.collected_resources) if not already_encountered: # boss不计入总价值,只记录遭遇 self.collected_resources.append({ 'position': (x, y), 'type': cell, 'value': 0 # boss不计入总价值 }) print(f"第{step_num}步: 遭遇Boss 位置({x},{y})") else: print(f"第{step_num}步: 移动到已遭遇的Boss位置({x},{y})") elif cell.startswith('l'): # 检查是否已经触发过这个机关 already_triggered = any(r['position'] == (x, y) for r in self.collected_resources) if not already_triggered: # 机关不计入总价值,只记录触发 self.collected_resources.append({ 'position': (x, y), 'type': cell, 'value': 0 # 机关不计入总价值 }) print(f"第{step_num}步: 触发机关 位置({x},{y})") else: print(f"第{step_num}步: 移动到已触发的机关位置({x},{y})") elif (x, y) in self.visited: # 重复访问不计入总价值 print(f"第{step_num}步: 重复访问 位置({x},{y})") else: print(f"第{step_num}步: 移动到空地 位置({x},{y})") return reward # 只返回金币和陷阱的价值 def _update_marked_map(self): """更新标记地图""" self.marked_map = copy.deepcopy(self.maze) for idx, (x, y) in enumerate(self.path): if self.marked_map[y][x] not in ['s', 'e']: self.marked_map[y][x] = f'p{idx}' def get_path(self): return self.path def get_total_reward(self): """获取总奖励(只包含金币和陷阱)""" # 计算只包括金币和陷阱的总价值 total_value = 0 for resource in self.collected_resources: if resource['type'].startswith('g') or resource['type'].startswith('t'): total_value += resource['value'] return total_value def get_marked_map(self): return self.marked_map def run_3x3_greedy_collection(self, max_moves=1000): """ 运行3x3视野贪心资源收集算法 - GUI兼容接口 Args: max_moves: 最大移动步数,防止无限循环 Returns: dict: 包含路径、收集的资源等信息 """ print("\\n开始3x3视野贪心资源收集...") # 重置状态 self.path = [] self.visited = set() self.total_reward = 0 self.collected_resources = [] self.current_step = 0 self.is_running = True if not self.start_pos or not self.end_pos: print("错误:无法找到起点或终点") return self._get_collection_result() pos = self.start_pos self.path.append(pos) self.visited.add(pos) moves = 0 stuck_count = 0 max_stuck = 50 # 连续无进展的最大步数 while moves < max_moves and stuck_count < max_stuck and pos != self.end_pos: moves += 1 next_pos = self.greedy_step(pos) if not next_pos: print(f"第{moves}步: 无法找到下一步移动位置") stuck_count += 1 if stuck_count >= max_stuck: break continue # 移动到新位置 x, y = next_pos cell = self.maze[y][x] # 计算奖励并收集资源 reward = self._process_cell_reward(x, y, cell, moves) self.total_reward += reward self.path.append((x, y)) self.visited.add((x, y)) pos = (x, y) # 重置卡住计数器(如果有进展) resource_collected = any(r['position'] == (x, y) for r in self.collected_resources) if resource_collected or pos == self.end_pos: stuck_count = 0 else: stuck_count += 1 # 检查结束原因 if pos == self.end_pos: print(f"第{moves}步: 到达终点!") elif moves >= max_moves: print(f"达到最大移动步数 {max_moves},结束收集") elif stuck_count >= max_stuck: print(f"连续 {max_stuck} 步未找到有效移动,结束收集") # 更新标记地图 self._update_marked_map() self.is_running = False # 计算只包括金币和陷阱的总价值 actual_total_value = 0 for resource in self.collected_resources: if resource['type'].startswith('g') or resource['type'].startswith('t'): actual_total_value += resource['value'] print("3x3视野资源收集完成!") print(f"总步数: {len(self.path)-1}, 收集资源数: {len(self.collected_resources)}, 资源总价值: {actual_total_value}") return self._get_collection_result() def _get_collection_result(self): """获取收集结果""" # 计算只包括金币和陷阱的总价值 actual_total_value = 0 for resource in self.collected_resources: if resource['type'].startswith('g') or resource['type'].startswith('t'): actual_total_value += resource['value'] return { 'path': [(x, y) for x, y in self.path], # 保持(x, y)格式 'collected_resources': self.collected_resources.copy(), 'total_value': actual_total_value, # 只包括金币和陷阱的价值 'total_moves': len(self.path) - 1, 'resources_count': len(self.collected_resources), 'start_pos': self.start_pos, 'end_pos': self.end_pos, 'final_pos': self.path[-1] if self.path else self.start_pos, 'explored_positions': len(self.visited) } def get_current_position(self): """获取当前位置(分步模式)""" if self.path and 0 <= self.current_step < len(self.path): return self.path[self.current_step] return None def step_by_step_mode(self): """启用分步模式,允许GUI逐步显示路径""" if not self.path: print("请先运行收集算法") return False self.current_step = 0 return True def next_step(self): """获取下一步(分步模式)""" if self.current_step < len(self.path) - 1: self.current_step += 1 return True return False def load_from_matrix(self, maze_matrix): """从二维数组加载迷宫""" self.maze = copy.deepcopy(maze_matrix) self.rowNums = len(self.maze) self.colNums = len(self.maze[0]) if self.rowNums > 0 else 0 # 重新查找起点和终点 self.start_pos = None self.end_pos = None for i in range(self.rowNums): for j in range(self.colNums): if self.maze[i][j] == 's': self.start_pos = (j, i) elif self.maze[i][j] == 'e': self.end_pos = (j, i) # 重新计算到终点的距离 self.dist_to_end = self.bfs_from_end() self.total_reward = 0 self.marked_map = copy.deepcopy(self.maze)