1. 多模态地理空间感知引擎
跨模态地理对齐网络
import torch
import torch.nn as nn
from transformers import CLIPVisionModel, BertModelclass GeoAlignmentNetwork(nn.Module):def __init__(self, embed_dim=512):super().__init__()# 视觉编码器 (卫星+街景)self.visual_encoder = CLIPVisionModel.from_pretrained("openai/clip-vit-base-patch32")self.visual_proj = nn.Linear(768, embed_dim)# 文本编码器self.text_encoder = BertModel.from_pretrained("bert-base-uncased")self.text_proj = nn.Linear(768, embed_dim)# 空间编码器 (坐标+拓扑)self.spatial_encoder = nn.Sequential(nn.Linear(6, 128), # x,y,z + 拓扑特征nn.GELU(),nn.Linear(128, embed_dim))# 多模态融合self.fusion = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=8,dim_feedforward=2048)# 时空注意力self.temporal_attn = nn.MultiheadAttention(embed_dim, 8)def forward(self, images, texts, spatial_feats, timestamps=None):# 视觉特征visual_out = self.visual_encoder(images).last_hidden_state[:,0]visual_feats = self.visual_proj(visual_out)# 文本特征text_feats = self.text_encoder(**texts).last_hidden_state[:,0]text_feats = self.text_proj(text_feats)# 空间特征spatial_feats = self.spatial_encoder(spatial_feats)# 多模态融合fused = torch.stack([visual_feats, text_feats, spatial_feats], dim=1)fused = self.fusion(fused)# 时序融合 (如果有时间戳)if timestamps is not None:time_embed = self._time_encoding(timestamps)fused, _ = self.temporal_attn(fused + time_embed,fused + time_embed,fused)return fused.mean(dim=1) # 聚合特征def _time_encoding(self, timestamps):"""将时间戳转换为位置编码"""position = timestamps.float().unsqueeze(-1)div_term = torch.exp(torch.arange(0, 512, 2).float() * -(math.log(10000.0) / 512)).to(timestamps.device)pe = torch.zeros(*timestamps.shape, 512).to(timestamps.device)pe[..., 0::2] = torch.sin(position * div_term)pe[..., 1::2] = torch.cos(position * div_term)return pe
实时地理空间特征提取
import numpy as np
from torchvision.ops import roi_align
from torchgeo.models import ResNet50_Weightsclass RealTimeFeatureExtractor(nn.Module):def __init__(self):super().__init__()# 多尺度特征提取self.backbone = torch.hub.load('pytorch/vision:v0.10.0', 'resnet50', weights=ResNet50_Weights.SENTINEL2_RGB_MOCO)# 特征金字塔网络self.fpn = nn.ModuleDict({'c3': nn.Conv2d(512, 256, 1),'c4': nn.Conv2d(1024, 256, 1),'c5': nn.Conv2d(2048, 256, 1),'p3': nn.Conv2d(256, 256, 3, padding=1),'p4': nn.Conv2d(256, 256, 3, padding=1),'p5': nn.Conv2d(256, 256, 3, padding=1)})# 空间注意力self.spatial_attn = nn.Sequential(nn.Conv2d(256, 64, 1),nn.ReLU(),nn.Conv2d(64, 1, 1),nn.Sigmoid())def forward(self, x, rois=None):# 骨干网络c3 = self.backbone.layer1(x)c4 = self.backbone.layer2(c3)c5 = self.backbone.layer3(c4)# 构建特征金字塔p5 = self.fpn['c5'](c5)p4 = self.fpn['c4'](c4) + F.interpolate(p5, scale_factor=2)p3 = self.fpn['c3'](c3) + F.interpolate(p4, scale_factor=2)p3 = self.fpn['p3'](p3)p4 = self.fpn['p4'](p4)p5 = self.fpn['p5'](p5)# 应用空间注意力attn = self.spatial_attn(p3)p3 = p3 * attnif rois is not None:# ROI对齐提取区域特征features = []for i, roi in enumerate(rois):level = self._map_roi_to_level(roi)if level == 3: feature_map = p3elif level == 4: feature_map = p4else: feature_map = p5pooled = roi_align(feature_map, [roi], output_size=(7,7),spatial_scale=1.0/2**level)features.append(pooled)return torch.stack(features)return {'p3': p3, 'p4': p4, 'p5': p5}def _map_roi_to_level(self, roi):"""根据ROI大小映射到特征金字塔层级"""area = (roi[2]-roi[0]) * (roi[3]-roi[1])if area < 32*32: return 3elif area < 64*64: return 4else: return 5
2. 自主空间认知系统
空间记忆与推理模块
import numpy as np
from sklearn.cluster import DBSCAN
from collections import defaultdictclass SpatialMemory:def __init__(self, decay_factor=0.95):self.feature_memory = []self.position_memory = []self.semantic_graph = defaultdict(list)self.decay_factor = decay_factorself.current_pose = Nonedef update_pose(self, pose):"""更新当前位姿"""self.current_pose = posedef add_observation(self, features, position, semantic_label=None):"""添加空间观察"""self.feature_memory.append({'features': features,'position': position,'label': semantic_label,'weight': 1.0})if semantic_label:self._update_semantic_graph(features, position, semantic_label)def _update_semantic_graph(self, features, position, label):"""更新语义关系图"""# 查找相关节点related_nodes = []for node_id, node in self.semantic_graph.items():if node['label'] == label:dist = np.linalg.norm(np.array(position) - np.array(node['position']))if dist < 5.0: # 5米内视为相关related_nodes.append(node_id)# 创建或更新节点node_id = f"{label}_{len(self.semantic_graph)}"self.semantic_graph[node_id] = {'features': features,'position': position,'label': label,'connections': related_nodes}# 更新相关节点的连接for related_id in related_nodes:if node_id not in self.semantic_graph[related_id]['connections']:self.semantic_graph[related_id]['connections'].append(node_id)def recall_semantic_context(self, position, radius=10.0):"""回忆当前位置的语义上下文"""context = []for node_id, node in self.semantic_graph.items():dist = np.linalg.norm(np.array(position) - np.array(node['position']))if dist <= radius:context.append({'label': node['label'],'distance': dist,'connections': [self.semantic_graph[conn_id]['label'] for conn_id in node['connections']]})return contextdef predict_objects(self, position, radius=5.0):"""预测可能出现的物体"""# 基于空间邻近性nearby_objects = defaultdict(float)for mem in self.feature_memory:dist = np.linalg.norm(np.array(position) - np.array(mem['position']))if dist <= radius and mem['label']:nearby_objects[mem['label']] += mem['weight'] / (1 + dist)# 基于语义关系semantic_context = self.recall_semantic_context(position, radius)for item in semantic_context:for conn_label in item['connections']:nearby_objects[conn_label] += 0.5 / (1 + item['distance'])# 归一化概率total = sum(nearby_objects.values())if total > 0:return {k: v/total for k, v in nearby_objects.items()}return {}def decay_memory(self):"""衰减旧记忆的权重"""for mem in self.feature_memory:mem['weight'] *= self.decay_factor# 移除权重过低的记忆self.feature_memory = [m for m in self.feature_memory if m['weight'] > 0.1]def cluster_environment(self, eps=2.0, min_samples=3):"""聚类环境特征"""if not self.position_memory:return []positions = np.array([[m['position']['x'], m['position']['y']] for m in self.feature_memory])clustering = DBSCAN(eps=eps, min_samples=min_samples).fit(positions)clusters = defaultdict(list)for i, label in enumerate(clustering.labels_):if label != -1:clusters[label].append(self.feature_memory[i])return [{'center': np.mean([[m['position']['x'], m['position']['y']] for m in cluster], axis=0),'features': np.mean([m['features'] for m in cluster], axis=0),'labels': [m['label'] for m in cluster if m['label']]}for cluster in clusters.values()]
空间决策引擎
import networkx as nx
from sklearn.preprocessing import normalizeclass SpatialDecisionEngine:def __init__(self, map_data=None):self.cognitive_map = nx.Graph()self.current_goal = Noneself.known_areas = set()if map_data:self._build_initial_map(map_data)def _build_initial_map(self, map_data):"""从地图数据构建初始认知图"""for node in map_data['nodes']:self.cognitive_map.add_node(node['id'],pos=(node['x'], node['y']),type=node['type'])for link in map_data['links']:self.cognitive_map.add_edge(link['source'],link['target'],weight=link['cost'],type=link['type'])def update_from_memory(self, spatial_memory):"""根据空间记忆更新认知图"""clusters = spatial_memory.cluster_environment()for cluster in clusters:cluster_id = f"cluster_{len(self.known_areas)}"self.known_areas.add(cluster_id)self.cognitive_map.add_node(cluster_id,pos=cluster['center'],features=cluster['features'],labels=cluster['labels'])# 连接到最近的已知节点if self.cognitive_map.nodes:nearest = min(self.cognitive_map.nodes(data=True),key=lambda n: np.linalg.norm(np.array(n[1]['pos']) - cluster['center']))self.cognitive_map.add_edge(cluster_id,nearest[0],weight=np.linalg.norm(np.array(nearest[1]['pos']) - cluster['center']),type='inferred')def set_goal(self, goal_description, spatial_memory):"""基于语义描述设置目标"""# 在语义记忆中搜索匹配best_match = Nonemax_similarity = -1for node_id, node in self.cognitive_map.nodes(data=True):if 'labels' in node:for label in node['labels']:sim = self._semantic_similarity(goal_description, label)if sim > max_similarity:max_similarity = simbest_match = node_idself.current_goal = best_matchreturn best_matchdef plan_path(self, current_position, strategy='explore'):"""路径规划"""if not self.cognitive_map.nodes:return None# 查找最近节点作为起点current_node = min(self.cognitive_map.nodes(data=True),key=lambda n: np.linalg.norm(np.array(n[1]['pos']) - np.array([current_position['x'], current_position['y']])))[0]if strategy == 'goal' and self.current_goal:# 目标导向路径try:path = nx.shortest_path(self.cognitive_map,source=current_node,target=self.current_goal,weight='weight')return self._path_to_waypoints(path)except nx.NetworkXNoPath:pass# 探索性路径 (寻找未知区域)frontier_nodes = self._find_frontier_nodes(current_node)if frontier_nodes:target = min(frontier_nodes,key=lambda n: nx.shortest_path_length(self.cognitive_map,source=current_node,target=n,weight='weight'))path = nx.shortest_path(self.cognitive_map,source=current_node,target=target,weight='weight')return self._path_to_waypoints(path)# 默认随机游走return self._random_walk(current_node)def _find_frontier_nodes(self, current_node):"""查找前沿节点 (已知与未知边界)"""frontiers = []for node in self.cognitive_map.nodes():if node in self.known_areas:neighbors = list(self.cognitive_map.neighbors(node))unknown_neighbors = [n for n in neighbors if n not in self.known_areas]if unknown_neighbors:frontiers.append(node)return frontiersdef _path_to_waypoints(self, path):"""将路径节点转换为航点序列"""return [{'x': self.cognitive_map.nodes[node]['pos'][0],'y': self.cognitive_map.nodes[node]['pos'][1]}for node in path]def _random_walk(self, start_node, steps=5):"""生成随机游走路径"""path = [start_node]current = start_nodefor _ in range(steps):neighbors = list(self.cognitive_map.neighbors(current))if not neighbors:breakcurrent = np.random.choice(neighbors)path.append(current)return self._path_to_waypoints(path)def _semantic_similarity(self, text1, text2):"""计算文本语义相似度 (简化版)"""words1 = set(text1.lower().split())words2 = set(text2.lower().split())return len(words1 & words2) / (len(words1 | words2) + 1e-6)
3. 地理空间生成模型
城市形态生成网络
import torch
import torch.nn as nn
from torch.nn import functional as Fclass UrbanGenerator(nn.Module):def __init__(self, latent_dim=256):super().__init__()# 条件编码器 (地形+规划约束)self.condition_encoder = nn.Sequential(nn.Conv2d(4, 64, 4, 2, 1), # 输入通道:高程+坡度+土地利用+交通nn.LeakyReLU(0.2),nn.Conv2d(64, 128, 4, 2, 1),nn.BatchNorm2d(128),nn.LeakyReLU(0.2),nn.Conv2d(128, 256, 4, 2, 1),nn.BatchNorm2d(256),nn.LeakyReLU(0.2),nn.Flatten())# 生成器主干self.generator = nn.Sequential(# 输入: latent_dim + condition_featuresnn.ConvTranspose2d(256 + 256, 512, 4, 1, 0),nn.BatchNorm2d(512),nn.ReLU(),nn.ConvTranspose2d(512, 256, 4, 2, 1),nn.BatchNorm2d(256),nn.ReLU(),nn.ConvTranspose2d(256, 128, 4, 2, 1),nn.BatchNorm2d(128),nn.ReLU(),nn.ConvTranspose2d(128, 64, 4, 2, 1),nn.BatchNorm2d(64),nn.ReLU(),nn.ConvTranspose2d(64, 3, 3, 1, 1),nn.Tanh() # 输出[-1,1]范围)# 空间注意力模块self.attention = nn.Sequential(nn.Conv2d(256, 128, 1),nn.ReLU(),nn.Conv2d(128, 1, 1),nn.Sigmoid())def forward(self, z, conditions):"""生成城市布局"""# 编码条件cond_feat = self.condition_encoder(conditions)cond_feat = cond_feat.view(-1, 256, 1, 1)# 合并噪声和条件z = z.view(-1, 256, 1, 1)x = torch.cat([z, cond_feat], dim=1)# 通过生成器x = self.generator(x)# 应用空间注意力attn = self.attention(cond_feat)attn = F.interpolate(attn, size=x.shape[2:], mode='bilinear')x = x * attnreturn xdef generate_city_layout(self, conditions, num_samples=1):"""生成城市布局样本"""with torch.no_grad():z = torch.randn(num_samples, 256).to(conditions.device)generated = self(z, conditions)# 后处理输出generated = (generated + 1) / 2 # 转换到[0,1]范围return generated.cpu().numpy().transpose(0, 2, 3, 1)
城市生成对抗训练框架
class UrbanGAN:def __init__(self, lr=0.0002, beta1=0.5):self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")self.generator = UrbanGenerator().to(self.device)self.discriminator = UrbanDiscriminator().to(self.device)# 损失函数self.adv_loss = nn.BCEWithLogitsLoss()self.perceptual_loss = nn.L1Loss()# 优化器self.opt_g = torch.optim.Adam(self.generator.parameters(), lr=lr, betas=(beta1, 0.999))self.opt_d = torch.optim.Adam(self.discriminator.parameters(), lr=lr, betas=(beta1, 0.999))def train(self, dataloader, epochs):for epoch in range(epochs):for i, batch in enumerate(dataloader):real_images = batch['image'].to(self.device)conditions = batch['condition'].to(self.device)# ---------------------# 训练判别器# ---------------------self.opt_d.zero_grad()# 真实样本real_pred = self.discriminator(real_images, conditions)d_real_loss = self.adv_loss(real_pred, torch.ones_like(real_pred))# 生成样本z = torch.randn(real_images.size(0), 256).to(self.device)fake_images = self.generator(z, conditions)fake_pred = self.discriminator(fake_images.detach(), conditions)d_fake_loss = self.adv_loss(fake_pred, torch.zeros_like(fake_pred))d_loss = (d_real_loss + d_fake_loss) / 2d_loss.backward()self.opt_d.step()# ---------------------# 训练生成器# ---------------------self.opt_g.zero_grad()# 对抗损失fake_pred = self.discriminator(fake_images, conditions)g_adv_loss = self.adv_loss(fake_pred, torch.ones_like(fake_pred))# 感知损失g_perc_loss = self.perceptual_loss(fake_images, real_images)g_loss = g_adv_loss + 10 * g_perc_lossg_loss.backward()self.opt_g.step()# 打印训练状态if i % 100 == 0:print(f"[Epoch {epoch}/{epochs}] [Batch {i}/{len(dataloader)}] "f"[D loss: {d_loss.item()}] [G loss: {g_loss.item()}]")def evaluate(self, test_loader):"""评估生成质量"""with torch.no_grad():real_images = next(iter(test_loader))['image'].to(self.device)conditions = next(iter(test_loader))['condition'].to(self.device)z = torch.randn(real_images.size(0), 256).to(self.device)fake_images = self.generator(z, conditions)# 计算FID分数 (需要预计算真实数据统计量)fid_score = calculate_fid(real_images, fake_images)return {'fid': fid_score,'samples': fake_images.cpu()[:5]}class UrbanDiscriminator(nn.Module):def __init__(self):super().__init__()# 图像特征提取self.image_encoder = nn.Sequential(nn.Conv2d(3, 64, 4, 2, 1),nn.LeakyReLU(0.2),nn.Conv2d(64, 128, 4, 2, 1),nn.BatchNorm2d(128),nn.LeakyReLU(0.2),nn.Conv2d(128, 256, 4, 2, 1),nn.BatchNorm2d(256),nn.LeakyReLU(0.2))# 条件编码self.cond_encoder = nn.Sequential(nn.Conv2d(4, 64, 4, 2, 1),nn.LeakyReLU(0.2),nn.Conv2d(64, 128, 4, 2, 1),nn.BatchNorm2d(128),nn.LeakyReLU(0.2),nn.Conv2d(128, 256, 4, 2, 1),nn.BatchNorm2d(256),nn.LeakyReLU(0.2))# 判别头self.discriminator = nn.Sequential(nn.Conv2d(512, 256, 1),nn.LeakyReLU(0.2),nn.Conv2d(256, 1, 4),nn.Flatten())def forward(self, x, c):# 提取图像特征img_feat = self.image_encoder(x)# 提取条件特征cond_feat = self.cond_encoder(c)cond_feat = F.interpolate(cond_feat, size=img_feat.shape[2:], mode='bilinear')# 合并特征combined = torch.cat([img_feat, cond_feat], dim=1)return self.discriminator(combined)
4. 地理空间增强现实系统
AR空间注册与标注
import ARKit
import RealityKit
import CoreLocationclass ARGeospatialAnnotator: NSObject, ARSessionDelegate {private let arView = ARView()private let locationManager = CLLocationManager()private var worldMap: ARWorldMap?private var anchorEntities = [UUID: AnchorEntity]()override init() {super.init()setupAR()setupLocation()}private func setupAR() {let config = ARWorldTrackingConfiguration()config.planeDetection = [.horizontal, .vertical]config.worldAlignment = .gravityAndHeadingconfig.environmentTexturing = .automaticarView.session.run(config)arView.session.delegate = self}private func setupLocation() {locationManager.desiredAccuracy = kCLLocationAccuracyBestForNavigationlocationManager.startUpdatingLocation()locationManager.startUpdatingHeading()}func addGeoAnnotation(text: String, coordinate: CLLocationCoordinate2D, altitude: CLLocationDistance? = nil) {let geoAnchor = ARGeoAnchor(coordinate: coordinate, altitude: altitude ?? 0)arView.session.add(anchor: geoAnchor)let anchorEntity = AnchorEntity(anchor: geoAnchor)let textModel = self.createTextModel(text: text)anchorEntity.addChild(textModel)arView.scene.addAnchor(anchorEntity)anchorEntities[geoAnchor.identifier] = anchorEntity}private func createTextModel(text: String) -> ModelEntity {let mesh = MeshResource.generateText(text,extrusionDepth: 0.05,font: .systemFont(ofSize: 0.5),containerFrame: .zero,alignment: .center,lineBreakMode: .byWordWrapping)let material = SimpleMaterial(color: .white, isMetallic: false)let model = ModelEntity(mesh: mesh, materials: [material])model.scale = SIMD3<Float>(0.02, 0.02, 0.02)return model}func saveWorldMap(completion: @escaping (Result<URL, Error>) -> Void) {arView.session.getCurrentWorldMap { worldMap, error inif let error = error {completion(.failure(error))return}guard let worldMap = worldMap else {completion(.failure(ARError(.invalidWorldMap)))return}do {let data = try NSKeyedArchiver.archivedData(withRootObject: worldMap,requiringSecureCoding: true)let tempDir = FileManager.default.temporaryDirectorylet mapFile = tempDir.appendingPathComponent("worldmap.arexperience")try data.write(to: mapFile)completion(.success(mapFile))} catch {completion(.failure(error))}}}func loadWorldMap(from url: URL, completion: @escaping (Bool) -> Void) {do {let data = try Data(contentsOf: url)guard let worldMap = try NSKeyedUnarchiver.unarchivedObject(ofClass: ARWorldMap.self, from: data) else {completion(false)return}let config = ARWorldTrackingConfiguration()config.initialWorldMap = worldMaparView.session.run(config, options: [.resetTracking, .removeExistingAnchors])self.worldMap = worldMapcompletion(true)} catch {print("Failed to load world map: \(error)")completion(false)}}func session(_ session: ARSession, didUpdate frame: ARFrame) {// 监控世界映射状态if frame.worldMappingStatus == .mapped {// 自动保存世界地图self.saveWorldMap { result inif case .success(let url) = result {print("World map auto-saved to \(url)")}}}}
}
空间认知增强服务
import numpy as np
from sklearn.neighbors import NearestNeighborsclass SpatialAugmentationService:def __init__(self, reference_data):"""初始化增强服务Args:reference_data: 参考数据列表,每个元素为字典格式:{'position': {'x': float, 'y': float, 'z': float},'features': np.array,'semantic_labels': list[str]}"""self.data = reference_dataself.feature_dim = len(reference_data[0]['features'])# 构建空间索引self.spatial_index = NearestNeighbors(n_neighbors=5, metric='euclidean')positions = np.array([[d['position']['x'], d['position']['y'], d['position']['z']] for d in reference_data])self.spatial_index.fit(positions)# 构建语义索引self.semantic_index = {}for item in reference_data:for label in item['semantic_labels']:if label not in self.semantic_index:self.semantic_index[label] = []self.semantic_index[label].append(item)def enhance_observation(self, position, observed_features, k=3):"""增强当前观察Args:position: 当前位置字典 {'x': float, 'y': float, 'z': float}observed_features: 观察到的特征向量k: 用于增强的最近邻数量Returns:增强后的特征向量和语义上下文"""# 空间最近邻查询pos_array = np.array([position['x'], position['y'], position['z']])_, spatial_indices = self.spatial_index.kneighbors([pos_array], n_neighbors=k)spatial_neighbors = [self.data[i] for i in spatial_indices[0]]# 特征最近邻查询feature_distances = [np.linalg.norm(observed_features - d['features'])for d in self.data]feature_indices = np.argpartition(feature_distances, k)[:k]feature_neighbors = [self.data[i] for i in feature_indices]# 合并结果all_neighbors = spatial_neighbors + feature_neighborsunique_neighbors = {tuple(d['features']): d for d in all_neighbors}.values()# 计算增强特征features = []weights = []semantic_context = defaultdict(float)for neighbor in unique_neighbors:# 空间权重spatial_dist = np.linalg.norm(pos_array - np.array([neighbor['position']['x'],neighbor['position']['y'],neighbor['position']['z']]))spatial_weight = 1.0 / (1.0 + spatial_dist)# 特征相似度权重feature_sim = np.dot(observed_features, neighbor['features'])# 组合权重weight = spatial_weight * feature_simweights.append(weight)features.append(neighbor['features'])# 收集语义上下文for label in neighbor['semantic_labels']:semantic_context[label] += weight# 归一化权重weights = np.array(weights)weights = weights / (weights.sum() + 1e-6)# 计算加权特征features = np.array(features)enhanced_features = np.sum(features * weights[:, np.newaxis], axis=0)enhanced_features /= np.linalg.norm(enhanced_features) + 1e-6# 归一化语义上下文total = sum(semantic_context.values())semantic_context = {k: v/total for k, v in semantic_context.items()}return {'enhanced_features': enhanced_features,'semantic_context': semantic_context}def predict_objects(self, position, radius=5.0):"""预测区域内可能存在的物体"""pos_array = np.array([position['x'], position['y'], position['z']])# 查找空间邻近点distances = [np.linalg.norm(pos_array - np.array([d['position']['x'],d['position']['y'],d['position']['z']]))for d in self.data]nearby = [(i, dist) for i, dist in enumerate(distances) if dist <= radius]if not nearby:return {}# 统计物体出现频率object_counts = defaultdict(float)total_weight = 0.0for i, dist in nearby:weight = 1.0 / (1.0 + dist)for label in self.data[i]['semantic_labels']:object_counts[label] += weighttotal_weight += weight# 计算概率return {label: count / total_weightfor label, count in object_counts.items()}def suggest_exploration(self, position, explored_positions):"""建议探索目标"""unexplored = [d for d in self.data if not self._is_explored(d['position'], explored_positions)]if not unexplored:return None# 计算信息增益 (基于多样性和距离)pos_array = np.array([position['x'], position['y'], position['z']])scores = []for item in unexplored:item_pos = np.array([item['position']['x'],item['position']['y'],item['position']['z']])# 距离因子distance = np.linalg.norm(pos_array - item_pos)dist_score = 1.0 / (1.0 + distance)# 多样性因子 (基于新语义标签)new_labels = [label for label in item['semantic_labels']if not any(label in explored.get('labels', []) for explored in explored_positions)]div_score = len(new_labels) / max(1, len(item['semantic_labels']))scores.append(dist_score * div_score)best_idx = np.argmax(scores)return unexplored[best_idx]['position']def _is_explored(self, position, explored_pos