diff --git a/backend/artifacts/methods/mcp_methods.py b/backend/artifacts/methods/mcp_methods.py index 0ddcbb6e8a2ed962ef9b6d3de4d7557f0056435e..5e87006515d30aefed53d8dcb6cc987df3c5a8f5 100644 --- a/backend/artifacts/methods/mcp_methods.py +++ b/backend/artifacts/methods/mcp_methods.py @@ -11,20 +11,28 @@ # See the Mulan PSL v2 for more details. # Create: 2025-07-30 # ====================================================================================================================== - +import os import glob import gzip +import json import shutil +import tempfile +import zstandard as zstd +import subprocess from xml.etree import ElementTree +from constants.paths import CACHE_DIR from artifacts.models import MCPServer from artifacts.utils import clear_table from utils.cmd_executor import CommandExecutor from utils.logger import init_log from utils.time import timestamp2local - +from artifacts.serializers import MCPBulkCreateSerializer +from django.db import transaction logger = init_log('run.log') +CACHE_META_FILE = os.path.join(CACHE_DIR, "cache_metadata.json") + class MCPMethods: @@ -36,25 +44,46 @@ class MCPMethods: if not update_result: return { 'is_success': False, 'message': msg } # 读取 MCP 服务的信息 - mcp_data, msg = MCPMethods._read_mcp_info() + mcp_data, update_data, msg = MCPMethods._read_mcp_info() if not mcp_data: - return { 'is_success': False, 'message': msg } - # 将 MCP 服务的信息存入数据库中 - from artifacts.serializers import MCPBulkCreateSerializer - serializer = MCPBulkCreateSerializer(data=mcp_data, many=True) - clear_table(MCPServer._meta.db_table) - if not serializer.is_valid(): - logger.error(f"Failed to validate MCP data, errors: {serializer.errors}") - return { 'is_success': False, 'message': serializer.errors } - mcps = serializer.save() - msg = "Sync MCP data successfully." + return {'is_success': False, 'message': msg} + + # 如果表为空,直接批量创建 + if MCPServer.objects.count() == 0: + clear_table(MCPServer._meta.db_table) + serializer = MCPBulkCreateSerializer(data=mcp_data, many=True) + if not serializer.is_valid(): + logger.error(f"Failed to validate MCP data, errors: {serializer.errors}") + return {'is_success': False, 'message': serializer.errors} + serializer.save() + msg = "Bulk created MCP data successfully." + logger.info(msg) + return {'is_success': True, 'message': msg} + + # 增量更新:只处理update_data中的变更 + if update_data: + # 找出需要更新的记录名称 + update_names = [item.get('name') for item in update_data] + # 删除这些记录 + MCPServer.objects.filter(name__in=update_names).delete() + # 重新创建这些记录 + serializer = MCPBulkCreateSerializer(data=update_data, many=True) + if serializer.is_valid(): + serializer.save() + msg = f"Updated {len(update_data)} MCP records successfully." + else: + logger.error(f"Failed to update MCP data: {serializer.errors}") + return {'is_success': False, 'message': serializer.errors} + else: + msg = "No updates needed." + logger.info(msg) - return { 'is_success': True, 'message': msg } + return {'is_success': True, 'message': msg} @staticmethod def _update_mcp_info(): """更新MCP仓库信息""" - cmd = ['yum', 'makecache', '--disablerepo=*', '--enablerepo=mcp'] + cmd = ['yum', 'makecache'] logger.info(f"Start to execute command [{' '.join(cmd)}].") cmd_executor = CommandExecutor(cmd) _, stderr, code = cmd_executor.run() @@ -69,63 +98,320 @@ class MCPMethods: @staticmethod def _read_mcp_info(): """读取MCP信息并生成数据结构""" - # 匹配 primary.xml.gz 文件 + # 1. 加载本地缓存元数据 + cache_meta = MCPMethods._load_cache_metadata() + + # 2. 解析所有primary.xml文件,收集远程包信息 + remote_packages = MCPMethods._parse_all_primary_xml() + if not remote_packages: + return [], [],"No packages found in primary.xml" + + # 3. 分类:缓存命中的包 vs 需要更新的包 + cached_packages = [] + packages_to_update = [] + + for pkg in remote_packages: + if MCPMethods._is_package_cached(pkg, cache_meta): + # 从缓存加载 + cached_data = MCPMethods._load_from_cache(pkg) + cached_packages.append(cached_data) + logger.info(f"Cache HIT for {pkg['name']} v{pkg['version']}") + else: + packages_to_update.append(pkg) + logger.info(f"Cache MISS for {pkg['name']} v{pkg['version']}") + + # 4. 批量处理需要更新的包 + if packages_to_update: + logger.info(f"Processing {len(packages_to_update)} packages...") + new_packages = MCPMethods._process_packages_batch(packages_to_update, cache_meta) + cached_packages.extend(new_packages) + + # 5. 保存更新后的缓存元数据 + MCPMethods._save_cache_metadata(cache_meta) + + logger.info(f"Total packages: {len(cached_packages)} (Cached: {len(cached_packages) - len(packages_to_update)}, Updated: {len(packages_to_update)})") + #return cached_packages, 'Generate MCP service data successfully.' + return cached_packages, packages_to_update, 'Generate MCP service data successfully.' + + @staticmethod + def _load_cache_metadata(): + """加载缓存元数据""" + if os.path.exists(CACHE_META_FILE): + try: + with open(CACHE_META_FILE, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, FileNotFoundError): + pass + return {"packages": {}} + + @staticmethod + def _save_cache_metadata(cache_meta): + """保存缓存元数据""" + os.makedirs(CACHE_DIR, exist_ok=True) + with open(CACHE_META_FILE, 'w') as f: + json.dump(cache_meta, f, indent=2) + + @staticmethod + def _parse_all_primary_xml(): + """解析所有primary.xml文件,返回远程包信息列表""" + # 匹配 primary.xml 文件 logger.info('Start to match MCP meta file.') - pattern = "/var/cache/dnf/mcp-*/repodata/*-primary.xml.gz" + pattern = "/var/cache/dnf/mcp-*/repodata/*-primary.xml.*" matches = glob.glob(pattern) if not matches: - msg = "No match for *-primary.xml.gz." - logger.error(msg) - return [], msg - primary_file = matches[0] - logger.info(f"The MCP meta file: {primary_file}") + logger.error("No match for *-primary.xml.*") + return [] + + all_packages = [] + for primary_file in matches: + logger.info(f"The MCP meta file: {primary_file}") + packages = MCPMethods._parse_single_primary_xml(primary_file) + all_packages.extend(packages) - # 开始解压 primary.xml.gz + return all_packages + + @staticmethod + def _parse_single_primary_xml(primary_file): + """解析单个primary.xml文件""" + # 开始解压 primary.xml logger.info(f'Start to extract {primary_file}') - output_file = primary_file.rstrip('.gz') + output_file = primary_file.replace('.gz', '').replace('.zst', '') + try: - with gzip.open(primary_file, 'rb') as f_in: - with open(output_file, 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) + if primary_file.endswith('.gz'): + with gzip.open(primary_file, 'rb') as f_in: + with open(output_file, 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + elif primary_file.endswith('.zst'): + dctx = zstd.ZstdDecompressor() + with open(primary_file, 'rb') as f_in: + with open(output_file, 'wb') as f_out: + dctx.copy_stream(f_in, f_out) logger.info(f"extract successfully: {primary_file} -> {output_file}") - except PermissionError: - msg = "No permission." - logger.error(msg) - return [], msg - except gzip.BadGzipFile: - msg = f"{primary_file} is not a valid gzip format." - logger.error(msg) - return [], msg + except Exception as e: + logger.error(f"Failed to extract {primary_file}: {e}") + return [] # 读取 primary.xml 文件,生成 MCP 服务信息列表 - logger.info("Start to read MCP information files and generate MCP data.") + packages = [] tree = ElementTree.parse(output_file) root = tree.getroot() namespace = {'common': 'http://linux.duke.edu/metadata/common'} - mcp_data = [] + for package in root.findall('common:package', namespace): - mcp_info = {} package_name = package.find('common:name', namespace).text.strip() if package_name == 'mcp-servers': continue - else: - mcp_info['package_name'] = package_name - mcp_info['name'] = package_name.removeprefix('mcp-servers-') + + mcp_info = {} + mcp_info['package_name'] = package_name + mcp_info['name'] = package_name.removeprefix('mcp-servers-') version = package.find('common:version', namespace) - mcp_info['version'] = f"{version.get('ver')}-{version.get('rel')}" # 暂未考虑epoch + mcp_info['version'] = f"{version.get('ver')}-{version.get('rel')}" timestamp = package.find('common:time', namespace).get('file') mcp_info['updated_at'] = timestamp2local(int(timestamp)) mcp_info['key'] = mcp_info['name'] + '_' + mcp_info['version'] mcp_info['description'] = dict() mcp_info['description']['default'] = package.find('common:description', namespace).text - mcp_info['size'] = int(package.find('common:size', namespace).get('package')) - mcp_info['repo'] = package.find('common:url', namespace).text - mcp_data.append(mcp_info) + mcp_info['url'] = package.find('common:url', namespace).text + packages.append(mcp_info) - if not mcp_data: - msg = f"Failed to read mcp information." - logger.error(msg) - return [], msg - msg = 'Generate MCP service data successfully.' - logger.info(msg) - return mcp_data, msg + return packages + + @staticmethod + def _is_package_cached(pkg, cache_meta): + """检查包是否已缓存且版本一致""" + pkg_name = pkg['name'] + if pkg_name not in cache_meta['packages']: + return False + + cached_pkg = cache_meta['packages'][pkg_name] + cache_dir = os.path.join(CACHE_DIR, pkg['key']) + + return (cached_pkg['version'] == pkg['version'] and + cached_pkg['updated_at'] == pkg['updated_at'] and + os.path.exists(cache_dir)) + + @staticmethod + def _load_from_cache(pkg): + """从缓存加载包数据""" + pkg['readme'] = MCPMethods._get_mcp_readme(pkg['key']) + pkg['icon'] = MCPMethods._get_mcp_icon(pkg['key']) + pkg['mcp_config'] = MCPMethods._get_mcp_config(pkg['key']) + return pkg + + @staticmethod + def _process_packages_batch(packages_to_update, cache_meta): + """批量下载和串行解包""" + # 批量下载所有包 + package_names = [pkg['package_name'] for pkg in packages_to_update] + download_dir = MCPMethods._batch_download(package_names) + if not download_dir: + logger.error("Batch download failed") + return [] + + try: + processed_packages = [] + for pkg in packages_to_update: + try: + result = MCPMethods._extract_single_package(pkg, download_dir) + if result: + processed_packages.append(pkg) + # 更新缓存元数据 + cache_meta['packages'][pkg['name']] = { + 'version': pkg['version'], + 'updated_at': pkg['updated_at'] + } + except Exception as e: + logger.error(f"Failed to process {pkg['name']}: {e}") + + return processed_packages + + finally: + # 清理下载目录 + shutil.rmtree(download_dir, ignore_errors=True) + + @staticmethod + def _batch_download(package_names): + """批量下载RPM包""" + temp_dir = tempfile.mkdtemp(prefix='mcp_batch_download_') + cmd = ['dnf', 'download', '--downloaddir', temp_dir] + package_names + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + logger.error("Batch download failed: %s", result.stderr) + shutil.rmtree(temp_dir, ignore_errors=True) + return None + + return temp_dir + + @staticmethod + def _extract_single_package(pkg_info, download_dir): + """从下载目录找到RPM并解压单个包""" + mcp_key = pkg_info['key'] + package_name = pkg_info['package_name'] + + # 在下载目录中找到对应的RPM文件 + rpm_file = None + for fname in os.listdir(download_dir): + if fname.startswith(package_name) and fname.endswith('.rpm'): + rpm_file = os.path.join(download_dir, fname) + break + + if not rpm_file: + logger.error("No RPM file found for %s in download directory", package_name) + return False + + cache_dir = os.path.join(CACHE_DIR, mcp_key) + + # 如果缓存目录已存在,先清理 + if os.path.exists(cache_dir): + shutil.rmtree(cache_dir) + os.makedirs(cache_dir, exist_ok=True) + + # 解压到缓存目录 - 使用定向解包优化 + rpm2cpio_cmd = ['rpm2cpio', rpm_file] + target_files = [ + "./opt/mcp-servers/servers*/src/readme.md", + "./opt/mcp-servers/servers*/src/icon.png", + "./opt/mcp-servers/servers*/mcp_config.json", + ] + cpio_cmd = ['cpio', '-idm', '--quiet'] + target_files + + rpm2cpio_proc = subprocess.Popen(rpm2cpio_cmd, stdout=subprocess.PIPE) + cpio_proc = subprocess.Popen( + cpio_cmd, + stdin=rpm2cpio_proc.stdout, + cwd=cache_dir, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + rpm2cpio_proc.stdout.close() + _, stderr = cpio_proc.communicate() + + if cpio_proc.returncode == 0: + logger.info("Successfully extracted MCP package: %s", mcp_key) + # 读取资源文件 + pkg_info['readme'] = MCPMethods._get_mcp_readme(mcp_key) + pkg_info['icon'] = MCPMethods._get_mcp_icon(mcp_key) + pkg_info['mcp_config'] = MCPMethods._get_mcp_config(mcp_key) + return True + else: + logger.error("Failed to extract %s: %s", package_name, stderr.decode()) + shutil.rmtree(cache_dir, ignore_errors=True) + return False + + @staticmethod + def _get_mcp_readme(mcp_key: str = ""): + """获取MCP服务的README内容""" + if not mcp_key: + return "" + + # 修改为从缓存目录读取 + base_path = os.path.join(CACHE_DIR, mcp_key, 'opt', 'mcp-servers', 'servers') + + try: + # 遍历servers目录找到readme文件 + for server_name in os.listdir(base_path): + readme_path = os.path.join(base_path, server_name, 'src', 'readme.md') + if os.path.exists(readme_path): + with open(readme_path, 'r', encoding='utf-8') as f: + return f.read() + except Exception as e: + logger.error(f"Failed to read readme for {mcp_key}: {e}") + + return "" + + @staticmethod + def _get_mcp_icon(mcp_key: str = ""): + """获取MCP服务的Icon内容(Base64编码)""" + if not mcp_key: + return "" + + # 修改为从缓存目录读取 + base_path = os.path.join(CACHE_DIR, mcp_key, 'opt', 'mcp-servers', 'servers') + + try: + import base64 + # 遍历servers目录找到icon文件 + for server_name in os.listdir(base_path): + icon_path = os.path.join(base_path, server_name, 'src', 'icon.png') + if os.path.exists(icon_path): + with open(icon_path, 'rb') as f: + return base64.b64encode(f.read()).decode('utf-8') + except Exception as e: + logger.error(f"Failed to read icon for {mcp_key}: {e}") + + return "" + + @staticmethod + def _get_mcp_config(mcp_key=""): + """ + 读取 mcp_config.json 并返回字典。 + 目录结构示例: + {CACHE_DIR}/{mcp_key}/opt/mcp-servers/servers/{server_name}/ + ├─ mcp_config.json ← 读取这个文件 + └─ src/icon.png ← _get_mcp_icon 使用的文件 + """ + if not mcp_key: + return {} + + # 修改为从缓存目录读取 + base_path = os.path.join(CACHE_DIR, mcp_key, "opt", "mcp-servers", "servers") + + try: + import json + for server_name in os.listdir(base_path): + cfg_path = os.path.join(base_path, server_name, "mcp_config.json") + if os.path.exists(cfg_path): + with open(cfg_path, "r") as f: + try: + return json.load(f) + except ValueError as e: + logger.error("JSON decode error in {0}: {1}".format(cfg_path, e)) + return {} + except Exception as e: + logger.error("Failed to read mcp_config.json for {0}: {1}".format(mcp_key, e)) + + return {} diff --git a/backend/artifacts/models.py b/backend/artifacts/models.py index de553cef0a12e2f41927a1539769812efd502336..d68c86ec2c8f3ae50f02de8c62dd8463f23e0207 100644 --- a/backend/artifacts/models.py +++ b/backend/artifacts/models.py @@ -66,6 +66,8 @@ class MCPServer(models.Model): url = models.CharField("代码仓url", max_length=2048, blank=True, null=True) author = models.CharField("发布者", max_length=256, blank=True, null=True) description = JSONField("简介", default=dict, help_text="字典格式,key:语言,value:文本") + readme = models.TextField("README文本", blank=True, null=True) icon = models.TextField("图标数据", blank=True, null=True) - app_list = JSONField("智能体应用列表", default=list, help_text="列表,每个元素包含name,status") + mcp_config = JSONField("MCP配置内容",default=dict, help_text="完整的 mcp_config.json 内容",blank=True, null=True) + #app_list = JSONField("智能体应用列表", default=list, help_text="列表,每个元素包含name,status") diff --git a/backend/artifacts/serializers.py b/backend/artifacts/serializers.py index 6346e27d9a02945d817eba5c3d4d8f3aa3ae8789..b3c474845fa75dc78d27d2680a137cfacc27a14a 100644 --- a/backend/artifacts/serializers.py +++ b/backend/artifacts/serializers.py @@ -111,7 +111,7 @@ class MCPDetailSerializer(serializers.ModelSerializer): 'icon', 'cmd_list', 'installed_status', - 'app_list', + #'app_list', ) @staticmethod @@ -199,7 +199,7 @@ class MCPBulkCreateSerializer(serializers.ModelSerializer): 'description', 'readme', 'icon', - 'app_list', + #'app_list', ) list_serializer_class = MCPListSerializer diff --git a/backend/artifacts/views.py b/backend/artifacts/views.py index 49cab0d43eb510c0afc927f7cb70e92cd5e8382d..e6727b3a6d6e545163b2ebc0dd18bf3768b9ec59 100644 --- a/backend/artifacts/views.py +++ b/backend/artifacts/views.py @@ -63,6 +63,20 @@ class ArtifactViewSet(viewsets.GenericViewSet): logger.info(msg) return Response({'is_success': True, 'message': msg, 'time': data_time}, status=status.HTTP_200_OK) + @action(methods=['POST'], detail=False, url_path='sync-mcp') + def sync_mcp_only(self, request): + """单独同步MCP服务信息(新增接口)""" + logger.info("==== API: [POST] /v1.0/artifacts/sync-mcp/ ====") + + # 只同步MCP服务信息 + result = MCPMethods.sync_mcps() + if not result['is_success']: + return Response(result, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + msg = "Sync MCP data successfully." + logger.info(msg) + return Response({'is_success': True, 'message': msg}, status=status.HTTP_200_OK) + def list(self, request): """获取插件和MCP服务列表 """ diff --git a/backend/constants/paths.py b/backend/constants/paths.py index f1b85ee99b372b4604128a4dfc6d8e2e485e0ebf..f2b18d7b1de6e04a07f48c5a2b0127e90c55bce0 100644 --- a/backend/constants/paths.py +++ b/backend/constants/paths.py @@ -32,6 +32,8 @@ PLUGIN_REPO_DIR = '/etc/oedp/config/repo/cache' REPO_DETAILS_DIR = '/var/oedp/details' # MCP 服务 repo 文件 MCP_REPO_FILE = '/etc/yum.repos.d/mcp.repo' +# MCP 服务缓存地址 +CACHE_DIR = "/var/cache/mcp-assets" # 家目录 HOME_DIR = os.path.expanduser('~') # 插件包缓存目录