527 lines
20 KiB
Python
527 lines
20 KiB
Python
from collections import deque
|
||
import copy
|
||
|
||
class GreedyCollector:
|
||
def __init__(self, filename=None, maze=None):
|
||
self.filename = filename
|
||
self.maze = []
|
||
self.path = []
|
||
self.start_pos = None
|
||
self.end_pos = None
|
||
self.visited = set()
|
||
self.rowNums = 0
|
||
self.colNums = 0
|
||
# 添加GUI相关的属性
|
||
self.collected_resources = [] # 收集的资源详情
|
||
self.current_step = 0 # 当前步数
|
||
self.is_running = False # 是否正在运行
|
||
self.total_reward = 0
|
||
self.marked_map = []
|
||
|
||
if filename:
|
||
import csv
|
||
with open(filename, 'r') as f:
|
||
reader = csv.reader(f)
|
||
for idx, row in enumerate(reader):
|
||
self.maze.append(list(row))
|
||
elif maze:
|
||
self.maze = copy.deepcopy(maze)
|
||
|
||
if self.maze:
|
||
self._initialize_maze()
|
||
|
||
def _initialize_maze(self):
|
||
"""初始化迷宫相关数据"""
|
||
self.rowNums = len(self.maze)
|
||
self.colNums = len(self.maze[0]) if self.rowNums > 0 else 0
|
||
|
||
# 查找起点和终点
|
||
self.start_pos = None
|
||
self.end_pos = None
|
||
for i in range(self.rowNums):
|
||
for j in range(self.colNums):
|
||
if self.maze[i][j] == 's':
|
||
self.start_pos = (j, i)
|
||
elif self.maze[i][j] == 'e':
|
||
self.end_pos = (j, i)
|
||
|
||
# 从终点开始BFS预处理到终点的距离
|
||
self.dist_to_end = self.bfs_from_end()
|
||
self.marked_map = copy.deepcopy(self.maze)
|
||
|
||
def bfs_from_end(self):
|
||
"""从终点开始BFS预处理每个点到终点的距离"""
|
||
dist = {}
|
||
if not self.end_pos:
|
||
return dist
|
||
|
||
queue = deque([(self.end_pos[1], self.end_pos[0], 0)])
|
||
visited = set()
|
||
|
||
while queue:
|
||
row, col, d = queue.popleft()
|
||
if (col, row) in visited:
|
||
continue
|
||
visited.add((col, row))
|
||
dist[(col, row)] = d
|
||
|
||
# 上下左右四个方向
|
||
for dr, dc in [(-1,0), (1,0), (0,-1), (0,1)]:
|
||
nr, nc = row + dr, col + dc
|
||
if (0 <= nr < self.rowNums and 0 <= nc < self.colNums and
|
||
self.maze[nr][nc] != '1' and (nc, nr) not in visited):
|
||
queue.append((nr, nc, d + 1))
|
||
|
||
return dist
|
||
|
||
def can_reach_diagonal(self, x, y, dx, dy):
|
||
"""检查是否可以通过两步L形路径到达对角线位置"""
|
||
target_x, target_y = x + dx, y + dy
|
||
|
||
# 检查目标位置是否在迷宫范围内且不是墙壁
|
||
if (not (0 <= target_y < self.rowNums and 0 <= target_x < self.colNums) or
|
||
self.maze[target_y][target_x] == '1'):
|
||
return False
|
||
|
||
# 两种可能的L形路径
|
||
# 路径1: 先水平移动(x, y) -> (target_x, y) -> (target_x, target_y)
|
||
step1_h = (target_x, y)
|
||
# 路径2: 先垂直移动(x, y) -> (x, target_y) -> (target_x, target_y)
|
||
step1_v = (x, target_y)
|
||
|
||
path1_valid = (0 <= step1_h[1] < self.rowNums and 0 <= step1_h[0] < self.colNums and
|
||
self.maze[step1_h[1]][step1_h[0]] != '1')
|
||
path2_valid = (0 <= step1_v[1] < self.rowNums and 0 <= step1_v[0] < self.colNums and
|
||
self.maze[step1_v[1]][step1_v[0]] != '1')
|
||
|
||
return path1_valid or path2_valid
|
||
|
||
def get_diagonal_first_step(self, x, y, dx, dy):
|
||
"""获取到达对角线位置的第一步移动"""
|
||
target_x, target_y = x + dx, y + dy
|
||
|
||
# 两种可能的第一步
|
||
step1_h = (target_x, y) # 先水平移动
|
||
step1_v = (x, target_y) # 先垂直移动
|
||
|
||
valid_steps = []
|
||
|
||
# 检查水平路径
|
||
if (0 <= step1_h[1] < self.rowNums and 0 <= step1_h[0] < self.colNums and
|
||
self.maze[step1_h[1]][step1_h[0]] != '1'):
|
||
valid_steps.append(step1_h)
|
||
|
||
# 检查垂直路径
|
||
if (0 <= step1_v[1] < self.rowNums and 0 <= step1_v[0] < self.colNums and
|
||
self.maze[step1_v[1]][step1_v[0]] != '1'):
|
||
valid_steps.append(step1_v)
|
||
|
||
if not valid_steps:
|
||
return None
|
||
|
||
# 如果两个路径都可行,选择更好的
|
||
if len(valid_steps) == 1:
|
||
return valid_steps[0]
|
||
|
||
# 比较两个第一步的价值
|
||
val1 = self.evaluate_position(step1_h[0], step1_h[1])
|
||
val2 = self.evaluate_position(step1_v[0], step1_v[1])
|
||
|
||
if val1 != val2:
|
||
return step1_h if val1 > val2 else step1_v
|
||
|
||
# 价值相同时选择距离终点更近的
|
||
dist1 = self.dist_to_end.get(step1_h, 9999)
|
||
dist2 = self.dist_to_end.get(step1_v, 9999)
|
||
return step1_h if dist1 <= dist2 else step1_v
|
||
def evaluate_position(self, x, y):
|
||
"""评估位置的价值"""
|
||
cell = self.maze[y][x]
|
||
|
||
# 墙壁不能走
|
||
if cell == '1':
|
||
return -10000
|
||
|
||
# 检查是否已经访问过
|
||
if (x, y) in self.visited:
|
||
# 走过的空地为-1000
|
||
return -1000
|
||
|
||
# 检查是否已经收集/触发过资源
|
||
already_processed = any(r['position'] == (x, y) for r in self.collected_resources)
|
||
|
||
if cell.startswith('g'): # 金币
|
||
if already_processed:
|
||
return -1000 # 已收集过的金币
|
||
return 50 # 统一金币权重为50
|
||
elif cell.startswith('t'): # 陷阱
|
||
if already_processed:
|
||
return -1000 # 已踩过的陷阱
|
||
return -30 # 统一陷阱权重为-30
|
||
elif cell.startswith('b') or cell.startswith('l'): # boss或机关
|
||
if already_processed:
|
||
return -1000 # 已遭遇过的boss/机关
|
||
return -0.5
|
||
else: # 空地
|
||
return 0 # 未走过的空地为0
|
||
|
||
def get_position_priority(self, x, y):
|
||
"""获取位置的优先级,数字越大优先级越高"""
|
||
cell = self.maze[y][x]
|
||
|
||
# 墙壁不能走
|
||
if cell == '1':
|
||
return 0
|
||
|
||
# 已访问过的位置优先级最低
|
||
if (x, y) in self.visited:
|
||
return 1
|
||
|
||
# 检查是否已经处理过资源
|
||
already_processed = any(r['position'] == (x, y) for r in self.collected_resources)
|
||
|
||
if cell.startswith('g') and not already_processed: # 未收集的金币
|
||
return 5
|
||
elif cell == '0' or cell == 's' or cell == 'e': # 空地
|
||
return 4
|
||
elif (cell.startswith('b') or cell.startswith('l')) and not already_processed: # 未遭遇的boss/机关
|
||
return 3
|
||
elif cell.startswith('t') and not already_processed: # 未触发的陷阱
|
||
return 2
|
||
else: # 已处理过的资源
|
||
return 1
|
||
|
||
def greedy_step(self, pos):
|
||
"""贪心策略选择下一步"""
|
||
x, y = pos
|
||
candidates = []
|
||
|
||
# 1. 考虑上下左右四个直接相邻的位置
|
||
for dx, dy in [(-1,0), (1,0), (0,-1), (0,1)]:
|
||
nx, ny = x + dx, y + dy
|
||
if (0 <= ny < self.rowNums and 0 <= nx < self.colNums and
|
||
self.maze[ny][nx] != '1'):
|
||
priority = self.get_position_priority(nx, ny)
|
||
value = self.evaluate_position(nx, ny)
|
||
dist = self.dist_to_end.get((nx, ny), 9999)
|
||
candidates.append(((nx, ny), priority, value, dist, 'direct'))
|
||
|
||
# 2. 考虑对角线位置(通过两步L形路径到达)
|
||
for dx, dy in [(-1,-1), (-1,1), (1,-1), (1,1)]: # 四个对角线方向
|
||
if self.can_reach_diagonal(x, y, dx, dy):
|
||
first_step = self.get_diagonal_first_step(x, y, dx, dy)
|
||
if first_step and first_step not in [c[0] for c in candidates]:
|
||
nx, ny = first_step
|
||
priority = self.get_position_priority(nx, ny)
|
||
value = self.evaluate_position(nx, ny)
|
||
dist = self.dist_to_end.get((nx, ny), 9999)
|
||
candidates.append(((nx, ny), priority, value, dist, 'diagonal'))
|
||
|
||
# 过滤掉不能走的位置(墙壁和超出范围)
|
||
candidates = [c for c in candidates if c[1] > 0]
|
||
|
||
if not candidates:
|
||
return None
|
||
|
||
# 排序候选位置
|
||
def sort_key(item):
|
||
pos, priority, value, dist, move_type = item
|
||
|
||
# 按照优先级排序:金币 > 空地 > boss/机关 > 陷阱
|
||
# 同级别内部:
|
||
# - 对于金币和陷阱:按价值排序(金币越大越好,陷阱损失越小越好)
|
||
# - 对于空地:按距离终点排序(越近越好)
|
||
# - 对于boss/机关:按距离终点排序(越近越好)
|
||
|
||
if priority == 5: # 金币
|
||
return (priority, value, -dist) # 优先级最高,价值越大越好,距离越近越好
|
||
elif priority == 4: # 空地
|
||
return (priority, -dist, value) # 优先距离终点近的
|
||
elif priority == 3: # boss/机关
|
||
return (priority, -dist, value) # 优先距离终点近的
|
||
elif priority == 2: # 陷阱
|
||
return (priority, value, -dist) # 优先损失小的(value是负数,越大损失越小)
|
||
else: # 其他情况
|
||
return (priority, -dist, value)
|
||
|
||
candidates.sort(key=sort_key, reverse=True)
|
||
return candidates[0][0]
|
||
|
||
def run(self):
|
||
"""运行基本的贪心算法"""
|
||
self.path = []
|
||
self.visited = set()
|
||
self.total_reward = 0
|
||
self.collected_resources = []
|
||
|
||
if not self.start_pos or not self.end_pos:
|
||
print("错误:无法找到起点或终点")
|
||
return
|
||
|
||
pos = self.start_pos
|
||
self.path.append(pos)
|
||
self.visited.add(pos)
|
||
step_count = 0
|
||
max_steps = self.rowNums * self.colNums * 3
|
||
|
||
while pos != self.end_pos and step_count < max_steps:
|
||
next_pos = self.greedy_step(pos)
|
||
if not next_pos:
|
||
print(f"步骤{step_count}: 无法找到下一步移动位置")
|
||
break
|
||
|
||
x, y = next_pos
|
||
cell = self.maze[y][x]
|
||
|
||
# 计算奖励并收集资源
|
||
reward = self._process_cell_reward(x, y, cell, step_count + 1)
|
||
|
||
self.total_reward += reward
|
||
self.path.append((x, y))
|
||
self.visited.add((x, y))
|
||
pos = (x, y)
|
||
step_count += 1
|
||
|
||
# 更新标记地图
|
||
self._update_marked_map()
|
||
|
||
def _process_cell_reward(self, x, y, cell, step_num):
|
||
"""处理格子奖励和资源收集"""
|
||
reward = 0
|
||
|
||
if cell.startswith('g'):
|
||
# 检查是否已经收集过这个金币
|
||
already_collected = any(r['position'] == (x, y) for r in self.collected_resources)
|
||
if not already_collected:
|
||
try:
|
||
reward = int(cell[1:])
|
||
except:
|
||
reward = 1
|
||
self.collected_resources.append({
|
||
'position': (x, y),
|
||
'type': cell,
|
||
'value': reward
|
||
})
|
||
print(f"第{step_num}步: 收集金币 位置({x},{y}), 价值+{reward}")
|
||
else:
|
||
print(f"第{step_num}步: 移动到已收集的金币位置({x},{y})")
|
||
elif cell.startswith('t'):
|
||
# 检查是否已经踩过这个陷阱
|
||
already_triggered = any(r['position'] == (x, y) for r in self.collected_resources)
|
||
if not already_triggered:
|
||
try:
|
||
reward = -int(cell[1:])
|
||
except:
|
||
reward = -1
|
||
self.collected_resources.append({
|
||
'position': (x, y),
|
||
'type': cell,
|
||
'value': reward
|
||
})
|
||
print(f"第{step_num}步: 触发陷阱 位置({x},{y}), 价值{reward}")
|
||
else:
|
||
print(f"第{step_num}步: 移动到已触发的陷阱位置({x},{y})")
|
||
elif cell.startswith('b'):
|
||
# 检查是否已经遭遇过这个boss
|
||
already_encountered = any(r['position'] == (x, y) for r in self.collected_resources)
|
||
if not already_encountered:
|
||
# boss不计入总价值,只记录遭遇
|
||
self.collected_resources.append({
|
||
'position': (x, y),
|
||
'type': cell,
|
||
'value': 0 # boss不计入总价值
|
||
})
|
||
print(f"第{step_num}步: 遭遇Boss 位置({x},{y})")
|
||
else:
|
||
print(f"第{step_num}步: 移动到已遭遇的Boss位置({x},{y})")
|
||
elif cell.startswith('l'):
|
||
# 检查是否已经触发过这个机关
|
||
already_triggered = any(r['position'] == (x, y) for r in self.collected_resources)
|
||
if not already_triggered:
|
||
# 机关不计入总价值,只记录触发
|
||
self.collected_resources.append({
|
||
'position': (x, y),
|
||
'type': cell,
|
||
'value': 0 # 机关不计入总价值
|
||
})
|
||
print(f"第{step_num}步: 触发机关 位置({x},{y})")
|
||
else:
|
||
print(f"第{step_num}步: 移动到已触发的机关位置({x},{y})")
|
||
elif (x, y) in self.visited:
|
||
# 重复访问不计入总价值
|
||
print(f"第{step_num}步: 重复访问 位置({x},{y})")
|
||
else:
|
||
print(f"第{step_num}步: 移动到空地 位置({x},{y})")
|
||
|
||
return reward # 只返回金币和陷阱的价值
|
||
|
||
def _update_marked_map(self):
|
||
"""更新标记地图"""
|
||
self.marked_map = copy.deepcopy(self.maze)
|
||
for idx, (x, y) in enumerate(self.path):
|
||
if self.marked_map[y][x] not in ['s', 'e']:
|
||
self.marked_map[y][x] = f'p{idx}'
|
||
|
||
def get_path(self):
|
||
return self.path
|
||
|
||
def get_total_reward(self):
|
||
"""获取总奖励(只包含金币和陷阱)"""
|
||
# 计算只包括金币和陷阱的总价值
|
||
total_value = 0
|
||
for resource in self.collected_resources:
|
||
if resource['type'].startswith('g') or resource['type'].startswith('t'):
|
||
total_value += resource['value']
|
||
return total_value
|
||
|
||
def get_marked_map(self):
|
||
return self.marked_map
|
||
|
||
def run_3x3_greedy_collection(self, max_moves=1000):
|
||
"""
|
||
运行3x3视野贪心资源收集算法 - GUI兼容接口
|
||
|
||
Args:
|
||
max_moves: 最大移动步数,防止无限循环
|
||
|
||
Returns:
|
||
dict: 包含路径、收集的资源等信息
|
||
"""
|
||
print("\\n开始3x3视野贪心资源收集...")
|
||
|
||
# 重置状态
|
||
self.path = []
|
||
self.visited = set()
|
||
self.total_reward = 0
|
||
self.collected_resources = []
|
||
self.current_step = 0
|
||
self.is_running = True
|
||
|
||
if not self.start_pos or not self.end_pos:
|
||
print("错误:无法找到起点或终点")
|
||
return self._get_collection_result()
|
||
|
||
pos = self.start_pos
|
||
self.path.append(pos)
|
||
self.visited.add(pos)
|
||
moves = 0
|
||
stuck_count = 0
|
||
max_stuck = 50 # 连续无进展的最大步数
|
||
|
||
while moves < max_moves and stuck_count < max_stuck and pos != self.end_pos:
|
||
moves += 1
|
||
next_pos = self.greedy_step(pos)
|
||
|
||
if not next_pos:
|
||
print(f"第{moves}步: 无法找到下一步移动位置")
|
||
stuck_count += 1
|
||
if stuck_count >= max_stuck:
|
||
break
|
||
continue
|
||
|
||
# 移动到新位置
|
||
x, y = next_pos
|
||
cell = self.maze[y][x]
|
||
|
||
# 计算奖励并收集资源
|
||
reward = self._process_cell_reward(x, y, cell, moves)
|
||
|
||
self.total_reward += reward
|
||
self.path.append((x, y))
|
||
self.visited.add((x, y))
|
||
pos = (x, y)
|
||
|
||
# 重置卡住计数器(如果有进展)
|
||
resource_collected = any(r['position'] == (x, y) for r in self.collected_resources)
|
||
if resource_collected or pos == self.end_pos:
|
||
stuck_count = 0
|
||
else:
|
||
stuck_count += 1
|
||
|
||
# 检查结束原因
|
||
if pos == self.end_pos:
|
||
print(f"第{moves}步: 到达终点!")
|
||
elif moves >= max_moves:
|
||
print(f"达到最大移动步数 {max_moves},结束收集")
|
||
elif stuck_count >= max_stuck:
|
||
print(f"连续 {max_stuck} 步未找到有效移动,结束收集")
|
||
|
||
# 更新标记地图
|
||
self._update_marked_map()
|
||
|
||
self.is_running = False
|
||
|
||
# 计算只包括金币和陷阱的总价值
|
||
actual_total_value = 0
|
||
for resource in self.collected_resources:
|
||
if resource['type'].startswith('g') or resource['type'].startswith('t'):
|
||
actual_total_value += resource['value']
|
||
|
||
print("3x3视野资源收集完成!")
|
||
print(f"总步数: {len(self.path)-1}, 收集资源数: {len(self.collected_resources)}, 资源总价值: {actual_total_value}")
|
||
|
||
return self._get_collection_result()
|
||
|
||
def _get_collection_result(self):
|
||
"""获取收集结果"""
|
||
# 计算只包括金币和陷阱的总价值
|
||
actual_total_value = 0
|
||
for resource in self.collected_resources:
|
||
if resource['type'].startswith('g') or resource['type'].startswith('t'):
|
||
actual_total_value += resource['value']
|
||
|
||
return {
|
||
'path': [(x, y) for x, y in self.path], # 保持(x, y)格式
|
||
'collected_resources': self.collected_resources.copy(),
|
||
'total_value': actual_total_value, # 只包括金币和陷阱的价值
|
||
'total_moves': len(self.path) - 1,
|
||
'resources_count': len(self.collected_resources),
|
||
'start_pos': self.start_pos,
|
||
'end_pos': self.end_pos,
|
||
'final_pos': self.path[-1] if self.path else self.start_pos,
|
||
'explored_positions': len(self.visited)
|
||
}
|
||
|
||
def get_current_position(self):
|
||
"""获取当前位置(分步模式)"""
|
||
if self.path and 0 <= self.current_step < len(self.path):
|
||
return self.path[self.current_step]
|
||
return None
|
||
|
||
def step_by_step_mode(self):
|
||
"""启用分步模式,允许GUI逐步显示路径"""
|
||
if not self.path:
|
||
print("请先运行收集算法")
|
||
return False
|
||
|
||
self.current_step = 0
|
||
return True
|
||
|
||
def next_step(self):
|
||
"""获取下一步(分步模式)"""
|
||
if self.current_step < len(self.path) - 1:
|
||
self.current_step += 1
|
||
return True
|
||
return False
|
||
|
||
def load_from_matrix(self, maze_matrix):
|
||
"""从二维数组加载迷宫"""
|
||
self.maze = copy.deepcopy(maze_matrix)
|
||
self.rowNums = len(self.maze)
|
||
self.colNums = len(self.maze[0]) if self.rowNums > 0 else 0
|
||
|
||
# 重新查找起点和终点
|
||
self.start_pos = None
|
||
self.end_pos = None
|
||
for i in range(self.rowNums):
|
||
for j in range(self.colNums):
|
||
if self.maze[i][j] == 's':
|
||
self.start_pos = (j, i)
|
||
elif self.maze[i][j] == 'e':
|
||
self.end_pos = (j, i)
|
||
|
||
# 重新计算到终点的距离
|
||
self.dist_to_end = self.bfs_from_end()
|
||
self.total_reward = 0
|
||
self.marked_map = copy.deepcopy(self.maze)
|