登录
注册
开源
企业版
高校版
搜索
帮助中心
使用条款
关于我们
开源
企业版
高校版
私有云
模力方舟
AI 队友
登录
注册
Gitee 2025 年度开源项目评选中
代码拉取完成,页面将自动刷新
开源项目
>
人工智能
>
AI-人工智能
&&
捐赠
捐赠前请先登录
取消
前往登录
扫描微信二维码支付
取消
支付完成
支付提示
将跳转至支付宝完成支付
确定
取消
Watch
不关注
关注所有动态
仅关注版本发行动态
关注但不提醒动态
101
Star
1.4K
Fork
947
GVP
MindSpore
/
mindformers
代码
Issues
130
Pull Requests
87
Wiki
统计
流水线
服务
质量分析
Jenkins for Gitee
腾讯云托管
腾讯云 Serverless
悬镜安全
阿里云 SAE
Codeblitz
SBOM
我知道了,不再自动展开
更新失败,请稍后重试!
移除标识
内容风险标识
本任务被
标识为内容中包含有代码安全 Bug 、隐私泄露等敏感信息,仓库外成员不可访问
新版LLM Datasets DT测试设计文档
TODO
#IDCAVO
RFC
Lin
成员
创建于
2025-12-11 11:21
# MindSpore Transformers LLMDataset DT测试设计说明书 大语言模型数据集(LLMDataset)DT测试设计说明书 **修订记录Version Control** | 日期Date | 修订版本Version | 描述Description | 作者Prepared by | |:------:|:------|:------|:------:| | 2025-11-11 | V1.0 | 初版 | - | 本文档描述 MindSpore Transformers 中 LLMDataset 模块的 DT(Development Test)测试设计,包括单元测试、集成测试和系统测试的详细设计。 --- ## 1. 测试概述 ### 1.1 测试目标 - 验证 LLMDataset 各功能模块的正确性和健壮性 - 确保多种数据加载器类型(MindDataset、HFDataLoader、MegatronDataLoader)正常工作 - 验证分布式场景下数据分片的正确性 - 确保动态批处理、EOD 重置、注意力掩码生成等功能正常 - 验证 Token 统计分析功能的准确性 - 验证异常场景的错误处理和提示信息 ### 1.2 测试范围 | 测试类型 | 测试范围 | | ------ | ------ | | 单元测试(UT) | 核心方法的功能验证、边界条件测试、异常处理测试 | | 集成测试(IT) | 模块间交互验证、配置解析流程、数据管道验证 | | 系统测试(ST) | 端到端数据加载流程、分布式场景、性能验证 | ### 1.3 测试环境 | 环境要素 | 要求 | | ------ | ------ | | 框架版本 | MindSpore >= 2.3.0 | | 硬件环境 | CPU / Ascend 910B(用于分布式测试) | | Python版本 | Python >= 3.8 | | 依赖库 | numpy, transformers (可选) | --- ## 2. 单元测试设计 ### 2.1 LLMDataset 初始化测试 #### 2.1.1 test_init_with_none_config **测试目标**:验证 dataset_config 为 None 时的异常处理 **前置条件**:无 **测试步骤**: 1. 调用 `LLMDataset(dataset_config=None)` **预期结果**: - 抛出 `ValueError`,错误信息包含 "dataset_config cannot be None" **测试代码示例**: ```python def test_init_with_none_config(): with pytest.raises(ValueError, match="dataset_config cannot be None"): LLMDataset(dataset_config=None) ``` #### 2.1.2 test_init_with_invalid_data_loader_config **测试目标**:验证 data_loader 非字典类型时的异常处理 **前置条件**:无 **测试步骤**: 1. 创建配置 `{"data_loader": "invalid_string"}` 2. 调用 `LLMDataset(dataset_config=invalid_config)` **预期结果**: - 抛出 `ValueError`,错误信息包含 "data_loader_config must be a dict" **测试代码示例**: ```python def test_init_with_invalid_data_loader_config(): invalid_config = { "data_loader": "invalid_string" # Should be a dict } with pytest.raises(ValueError, match="data_loader_config must be a dict"): LLMDataset(dataset_config=invalid_config) ``` #### 2.1.3 test_init_with_missing_type **测试目标**:验证缺少 type 字段时的异常处理 **前置条件**:无 **测试步骤**: 1. 创建缺少 type 的配置 2. 调用 `LLMDataset(dataset_config=invalid_config)` **预期结果**: - 抛出 `ValueError`,错误信息包含 "data_loader_config must contain 'type' key" **测试代码示例**: ```python def test_init_with_missing_type(): invalid_config = { "data_loader": { "config": {} } } with pytest.raises(ValueError, match="data_loader_config must contain 'type' key"): LLMDataset(dataset_config=invalid_config) ``` #### 2.1.4 test_init_with_valid_megatron_config **测试目标**:验证有效 Megatron 配置的初始化 **前置条件**: - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建有效的 BlendedMegatronDatasetDataLoader 配置 2. 调用 `LLMDataset(dataset_config=config)` 3. 验证 data_loader_type 属性 **预期结果**: - 初始化成功 - `data_loader_type == "BlendedMegatronDatasetDataLoader"` **测试代码示例**: ```python def test_init_with_valid_megatron_config(setup_context): valid_config = { "data_loader": { "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": { "seq_length": 8192, "eod": 0, "pad": 1, "data_path": ['0.5', '/path/data1', '0.5', '/path/data2'] } }, "input_columns": ["input_ids", "labels", "loss_mask", "position_ids"], "drop_remainder": True } dataset = LLMDataset(dataset_config=valid_config) assert dataset.data_loader_type == "BlendedMegatronDatasetDataLoader" assert dataset.data_loader_config["type"] == "BlendedMegatronDatasetDataLoader" ``` #### 2.1.5 test_init_with_valid_hf_config **测试目标**:验证有效 HuggingFace 配置的初始化 **前置条件**:无 **测试步骤**: 1. 创建有效的 HFDataLoader 配置(use_broadcast_data=False) 2. 调用 `LLMDataset(dataset_config=config)` 3. 验证 data_loader_type 属性 **预期结果**: - 初始化成功 - `data_loader_type == "HFDataLoader"` **测试代码示例**: ```python def test_init_with_valid_hf_config(): valid_config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test_dataset", "use_broadcast_data": False, "create_attention_mask": True, "handler": [] }, "input_columns": ["input_ids", "labels"], "drop_remainder": True } dataset = LLMDataset(dataset_config=valid_config) assert dataset.data_loader_type == "HFDataLoader" assert dataset.data_loader_config["type"] == "HFDataLoader" ``` #### 2.1.6 test_init_hf_with_broadcast_no_opt_level **测试目标**:验证 HFDataLoader 启用 broadcast 但未设置 opt_level 的异常处理 **前置条件**: - dataset_broadcast_opt_level < 3 **测试步骤**: 1. 创建 HFDataLoader 配置,use_broadcast_data=True 2. 调用 `LLMDataset(dataset_config=config)` **预期结果**: - 抛出 `ValueError`,错误信息包含 "please set `dataset_broadcast_opt_level: 3`" **测试代码示例**: ```python def test_init_hf_with_broadcast_no_opt_level(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": True # Requires opt_level >= 3 } } # Assuming opt_level < 3 with pytest.raises(ValueError, match="please set `dataset_broadcast_opt_level: 3`"): LLMDataset(dataset_config=config) ``` #### 2.1.7 test_init_with_packing_handler **测试目标**:验证 PackingHandler 自动启用 attention_mask **前置条件**:无 **测试步骤**: 1. 创建包含 PackingHandler 的配置 2. 调用 `LLMDataset(dataset_config=config)` 3. 验证 create_attention_mask 被自动设置为 True **预期结果**: - `data_loader_config.create_attention_mask == True` **测试代码示例**: ```python def test_init_with_packing_handler(): data_loader_config = DictConfig(**{ "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False, "handler": [ {"type": "PackingHandler", "seq_length": 4096} ] }) config = {"data_loader": data_loader_config} dataset = LLMDataset(dataset_config=config) assert dataset.data_loader_config.create_attention_mask == True ``` ### 2.2 配置方法测试 #### 2.2.1 test_set_megatron_dataset_config_seed **测试目标**:验证 Megatron 数据集种子设置 **前置条件**: - 使用 DictConfig 创建嵌套配置 - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建 BlendedMegatronDatasetDataLoader 配置 2. 调用 `set_megatron_dataset_config_seed(dataset_seed=5678)` 3. 验证 seed 值 **预期结果**: - `data_loader_config.config.seed == 5678` **测试代码示例**: ```python def test_set_megatron_dataset_config_seed(setup_context): inner_config = DictConfig(**{ "seq_length": 8192, "seed": 1234, "data_path": ['1.0', '/path/data'] }) data_loader = DictConfig(**{ "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": inner_config }) config = {"data_loader": data_loader} dataset = LLMDataset(dataset_config=config) dataset.set_megatron_dataset_config_seed(dataset_seed=5678) assert dataset.data_loader_config.config.seed == 5678 ``` #### 2.2.2 test_set_megatron_dataset_config_seed_non_megatron **测试目标**:验证非 Megatron 数据集调用种子设置不报错 **前置条件**:无 **测试步骤**: 1. 创建 HFDataLoader 配置 2. 调用 `set_megatron_dataset_config_seed(dataset_seed=5678)` **预期结果**: - 方法执行不抛出异常 - 配置不变化 **测试代码示例**: ```python def test_set_megatron_dataset_config_seed_non_megatron(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test_dataset", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) # Should not raise error even though it's not Megatron dataset dataset.set_megatron_dataset_config_seed(dataset_seed=5678) ``` #### 2.2.3 test_set_ms_dataset_config **测试目标**:验证 MindSpore 数据集全局配置设置 **前置条件**:无 **测试步骤**: 1. 创建 LLMDataset 实例 2. 调用 `set_ms_dataset_config(dataset_seed=4321, prefetch_size=2, numa_enable=True)` **预期结果**: - MindSpore 数据集配置被正确设置 - 不抛出异常 **测试代码示例**: ```python def test_set_ms_dataset_config(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test_dataset", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) dataset.set_ms_dataset_config( dataset_seed=4321, prefetch_size=2, numa_enable=True ) # Verify configuration is set (no exceptions raised) assert True ``` ### 2.3 列名获取测试 #### 2.3.1 test_get_default_input_columns_compressed_eod **测试目标**:验证压缩 EOD 掩码列名生成 **前置条件**:无 **测试步骤**: 1. 创建 LLMDataset 实例 2. 调用 `get_default_input_columns(create_compressed_eod_mask=True)` **预期结果**: - 返回 `['input_ids', 'labels', 'loss_mask', 'position_ids', 'actual_seq_len']` **测试代码示例**: ```python def test_get_default_input_columns_compressed_eod(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) columns = dataset.get_default_input_columns( create_attention_mask=False, create_compressed_eod_mask=True ) assert columns == ['input_ids', 'labels', 'loss_mask', 'position_ids', 'actual_seq_len'] ``` #### 2.3.2 test_get_default_input_columns_attention_mask **测试目标**:验证注意力掩码列名生成 **前置条件**:无 **测试步骤**: 1. 创建 LLMDataset 实例 2. 调用 `get_default_input_columns(create_attention_mask=True)` **预期结果**: - 返回 `['input_ids', 'labels', 'loss_mask', 'position_ids', 'attention_mask']` **测试代码示例**: ```python def test_get_default_input_columns_attention_mask(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) columns = dataset.get_default_input_columns( create_attention_mask=True, create_compressed_eod_mask=False ) assert columns == ['input_ids', 'labels', 'loss_mask', 'position_ids', 'attention_mask'] ``` #### 2.3.3 test_get_default_input_columns_megatron **测试目标**:验证 Megatron 数据集默认列名 **前置条件**: - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建 BlendedMegatronDatasetDataLoader 配置 2. 调用 `get_default_input_columns()` **预期结果**: - 返回 `['input_ids', 'labels', 'loss_mask', 'position_ids']` **测试代码示例**: ```python def test_get_default_input_columns_megatron(setup_context): config = { "data_loader": { "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": { "seq_length": 8192, "data_path": ['1.0', '/path/data'] } } } dataset = LLMDataset(dataset_config=config) columns = dataset.get_default_input_columns( create_attention_mask=False, create_compressed_eod_mask=False ) assert columns == ['input_ids', 'labels', 'loss_mask', 'position_ids'] ``` #### 2.3.4 test_get_default_input_columns_custom **测试目标**:验证自定义列名返回 **前置条件**:无 **测试步骤**: 1. 创建包含 input_columns 的配置 2. 调用 `get_default_input_columns()` **预期结果**: - 返回配置中指定的 input_columns **测试代码示例**: ```python def test_get_default_input_columns_custom(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False }, "input_columns": ["custom_col1", "custom_col2"] } dataset = LLMDataset(dataset_config=config) columns = dataset.get_default_input_columns( create_attention_mask=False, create_compressed_eod_mask=False ) assert columns == ["custom_col1", "custom_col2"] ``` ### 2.4 分片信息生成测试 #### 2.4.1 test_generate_shard_info_default **测试目标**:验证默认分片信息生成 **前置条件**: - 单设备模式 **测试步骤**: 1. 创建 LLMDataset 实例 2. 调用 `generate_shard_info(data_parallel_size=1)` **预期结果**: - `shard_id == 0` - `num_shards == 1` **测试代码示例**: ```python def test_generate_shard_info_default(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) shard_id, num_shards = dataset.generate_shard_info(data_parallel_size=1) assert shard_id == 0 assert num_shards == 1 ``` #### 2.4.2 test_generate_shard_info_semi_full_batch **测试目标**:验证 Semi Full Batch 模式分片信息 **前置条件**: - 设置 semi_auto_parallel 和 full_batch=True **测试步骤**: 1. 配置 semi_auto_parallel 和 full_batch 2. 调用 `generate_shard_info()` **预期结果**: - `shard_id == None` - `num_shards == None` ### 2.5 数据加载器创建测试 #### 2.5.1 test_create_data_loader_with_none_columns **测试目标**:验证 column_names 为 None 时的异常处理 **前置条件**:无 **测试步骤**: 1. 创建 LLMDataset 实例 2. 调用 `create_data_loader(column_names=None)` **预期结果**: - 抛出 `ValueError`,错误信息包含 "column_names cannot be None" **测试代码示例**: ```python def test_create_data_loader_with_none_columns(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) with pytest.raises(ValueError, match="column_names cannot be None"): dataset.create_data_loader(column_names=None) ``` ### 2.6 掩码配置检查测试 #### 2.6.1 test_is_create_compressed_eod_mask_megatron **测试目标**:验证 Megatron 数据集压缩 EOD 掩码检查 **前置条件**: - 使用 DictConfig 配置 - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建配置 `create_compressed_eod_mask=True` 2. 调用 `is_create_compressed_eod_mask()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_create_compressed_eod_mask_megatron(setup_context): inner_config = DictConfig(**{ "seq_length": 8192, "create_compressed_eod_mask": True, "data_path": ['1.0', '/path/data'] }) data_loader = DictConfig(**{ "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": inner_config }) config = {"data_loader": data_loader} dataset = LLMDataset(dataset_config=config) assert dataset.is_create_compressed_eod_mask() ``` #### 2.6.2 test_is_create_compressed_eod_mask_hf **测试目标**:验证 HF 数据集压缩 EOD 掩码检查 **前置条件**:无 **测试步骤**: 1. 创建配置 `create_compressed_eod_mask=True` 2. 调用 `is_create_compressed_eod_mask()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_create_compressed_eod_mask_hf(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False, "create_compressed_eod_mask": True } } dataset = LLMDataset(dataset_config=config) assert dataset.is_create_compressed_eod_mask() ``` #### 2.6.3 test_is_create_attention_mask_megatron **测试目标**:验证 Megatron 数据集注意力掩码检查 **前置条件**: - 使用 DictConfig 配置 - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建配置 `create_attention_mask=True` 2. 调用 `is_create_attention_mask()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_create_attention_mask_megatron(setup_context): inner_config = DictConfig(**{ "seq_length": 8192, "create_attention_mask": True, "data_path": ['1.0', '/path/data'] }) data_loader = DictConfig(**{ "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": inner_config }) config = {"data_loader": data_loader} dataset = LLMDataset(dataset_config=config) assert dataset.is_create_attention_mask() ``` #### 2.6.4 test_is_create_attention_mask_hf **测试目标**:验证 HF 数据集注意力掩码检查 **前置条件**:无 **测试步骤**: 1. 创建配置 `create_attention_mask=True` 2. 调用 `is_create_attention_mask()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_create_attention_mask_hf(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False, "create_attention_mask": True } } dataset = LLMDataset(dataset_config=config) assert dataset.is_create_attention_mask() ``` ### 2.7 数据加载器类型获取测试 #### 2.7.1 test_get_data_loader_type_megatron **测试目标**:验证获取 Megatron 数据加载器类型 **前置条件**: - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建 BlendedMegatronDatasetDataLoader 配置 2. 调用 `get_data_loader_type()` **预期结果**: - 返回 `"BlendedMegatronDatasetDataLoader"` **测试代码示例**: ```python def test_get_data_loader_type_megatron(setup_context): config = { "data_loader": { "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": { "seq_length": 8192, "data_path": ['1.0', '/path/data'] } } } dataset = LLMDataset(dataset_config=config) assert dataset.get_data_loader_type() == "BlendedMegatronDatasetDataLoader" ``` #### 2.7.2 test_get_data_loader_type_hf **测试目标**:验证获取 HF 数据加载器类型 **前置条件**:无 **测试步骤**: 1. 创建 HFDataLoader 配置 2. 调用 `get_data_loader_type()` **预期结果**: - 返回 `"HFDataLoader"` **测试代码示例**: ```python def test_get_data_loader_type_hf(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) assert dataset.get_data_loader_type() == "HFDataLoader" ``` #### 2.7.3 test_get_data_loader_type_minddataset **测试目标**:验证获取 MindDataset 数据加载器类型 **前置条件**:无 **测试步骤**: 1. 创建 MindDataset 配置 2. 调用 `get_data_loader_type()` **预期结果**: - 返回 `"MindDataset"` **测试代码示例**: ```python def test_get_data_loader_type_minddataset(): config = { "data_loader": { "type": "MindDataset", "dataset_dir": "/path/to/mindrecord" } } dataset = LLMDataset(dataset_config=config) assert dataset.get_data_loader_type() == "MindDataset" ``` ### 2.8 Token 统计功能测试 #### 2.8.1 test_perform_token_counting_default **测试目标**:验证默认参数创建 Token 统计函数 **前置条件**:无 **测试步骤**: 1. 调用 `LLMDataset.perform_token_counting()` **预期结果**: - 返回可调用对象 **测试代码示例**: ```python def test_perform_token_counting_default(): counter_func = LLMDataset.perform_token_counting() assert callable(counter_func) ``` #### 2.8.2 test_perform_token_counting_custom **测试目标**:验证自定义参数创建 Token 统计函数 **前置条件**:无 **测试步骤**: 1. 调用 `LLMDataset.perform_token_counting(top_n=20, min_token_id=10, max_token_id=1000, save_path="./test_output/")` **预期结果**: - 返回可调用对象 **测试代码示例**: ```python def test_perform_token_counting_custom(): counter_func = LLMDataset.perform_token_counting( top_n=20, min_token_id=10, max_token_id=1000, save_path="./test_output/" ) assert callable(counter_func) ``` ### 2.9 并行模式检查测试 #### 2.9.1 test_is_semi_mode **测试目标**:验证半自动并行模式检查 **前置条件**:无 **测试步骤**: 1. 设置 `parallel_mode="semi_auto_parallel"` 2. 调用 `LLMDataset._is_semi()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_semi_mode(): # Set to semi-auto parallel mode context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") is_semi = LLMDataset._is_semi() assert is_semi # Reset to stand-alone mode context.reset_auto_parallel_context() ``` #### 2.9.2 test_is_full_batch_mode **测试目标**:验证全批次模式检查 **前置条件**:无 **测试步骤**: 1. 设置 `full_batch=True` 2. 调用 `LLMDataset._is_full_batch()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_full_batch_mode(): # Set full_batch to True context.set_auto_parallel_context(full_batch=True) is_full = LLMDataset._is_full_batch() assert is_full # Reset context.reset_auto_parallel_context() ``` #### 2.9.3 test_is_data_parallel_mode **测试目标**:验证数据并行模式检查 **前置条件**:无 **测试步骤**: 1. 设置 `parallel_mode="data_parallel"` 2. 调用 `LLMDataset._is_data_parallel()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_data_parallel_mode(): # Set to data parallel mode context.set_auto_parallel_context(parallel_mode="data_parallel") is_dp = LLMDataset._is_data_parallel() assert is_dp # Reset context.reset_auto_parallel_context() ``` ### 2.10 YAML 配置测试 #### 2.10.1 test_init_with_pretrain_yaml_config **测试目标**:验证预训练 YAML 配置结构 **前置条件**: - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建模拟 llm_pretrain_template.yaml 的配置结构 2. 创建 LLMDataset 实例 3. 验证各属性 **预期结果**: - `data_loader_type == "BlendedMegatronDatasetDataLoader"` - `is_create_attention_mask() == True` - `is_create_compressed_eod_mask() == False` **测试代码示例**: ```python def test_init_with_pretrain_yaml_config(setup_context): inner_config = DictConfig(**{ "seq_length": 8192, "eod_mask_loss": True, "reset_position_ids": True, "create_attention_mask": True, "reset_attention_mask": True, "create_compressed_eod_mask": False, "eod_pad_length": 128, "eod": 0, "pad": 1, "data_path": ['0.3', "/path/megatron_data1", '0.7', "/path/megatron_data2"] }) data_loader = DictConfig(**{ "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": inner_config }) config = { "data_loader": data_loader, "input_columns": ["input_ids", "labels", "loss_mask", "position_ids", "attention_mask"], "drop_remainder": True } dataset = LLMDataset(dataset_config=config) assert dataset.data_loader_type == "BlendedMegatronDatasetDataLoader" assert dataset.is_create_attention_mask() assert not dataset.is_create_compressed_eod_mask() ``` #### 2.10.2 test_init_with_finetune_yaml_config **测试目标**:验证微调 YAML 配置结构 **前置条件**:无 **测试步骤**: 1. 创建模拟 llm_finetune_template.yaml 的配置结构(包含 PackingHandler) 2. 创建 LLMDataset 实例 3. 验证各属性 **预期结果**: - `data_loader_type == "HFDataLoader"` - `is_create_attention_mask() == True` - `data_loader_config.create_attention_mask == True`(由 PackingHandler 自动启用) **测试代码示例**: ```python def test_init_with_finetune_yaml_config(): data_loader_config = DictConfig(**{ "type": "HFDataLoader", "load_func": "load_dataset", "path": "llm-wizard/alpaca-gpt4-data-zh", "create_attention_mask": True, "create_compressed_eod_mask": False, "use_broadcast_data": False, "shuffle": True, "handler": [ {"type": "AlpacaInstructDataHandler", "seq_length": 4096}, {"type": "PackingHandler", "seq_length": 4096, "pack_strategy": "pack"} ] }) config = { "data_loader": data_loader_config, "input_columns": ["input_ids", "labels", "loss_mask", "position_ids", "attention_mask"], "drop_remainder": True } dataset = LLMDataset(dataset_config=config) assert dataset.data_loader_type == "HFDataLoader" assert dataset.is_create_attention_mask() assert not dataset.is_create_compressed_eod_mask() # Verify PackingHandler enables attention mask assert dataset.data_loader_config.create_attention_mask ``` ### 2.11 TokenCounter 类测试 #### 2.11.1 test_token_counter_init_valid **测试目标**:验证 TokenCounter 有效初始化 **前置条件**:无 **测试步骤**: 1. 创建 `TokenCounter(top_n=10, min_token_id=0, max_token_id=np.inf)` **预期结果**: - 实例创建成功 - 属性正确设置 **测试代码示例**: ```python def test_token_counter_init_valid(): from mindformers.dataset.llm_dataset import TokenCounter counter = TokenCounter(top_n=10, min_token_id=0, max_token_id=np.inf) assert counter.top_n == 10 assert counter.min_token_id == 0 assert counter.max_token_id == np.inf assert counter.step_num == 1 assert counter.initialized == False ``` #### 2.11.2 test_token_counter_init_invalid_top_n **测试目标**:验证 top_n 无效时的异常处理 **前置条件**:无 **测试步骤**: 1. 创建 `TokenCounter(top_n=0)` 2. 创建 `TokenCounter(top_n=-1)` 3. 创建 `TokenCounter(top_n="10")` **预期结果**: - 抛出 `ValueError`,错误信息包含 "top_n must be a positive integer" **测试代码示例**: ```python def test_token_counter_init_invalid_top_n(): from mindformers.dataset.llm_dataset import TokenCounter with pytest.raises(ValueError, match="top_n must be a positive integer"): TokenCounter(top_n=0) with pytest.raises(ValueError, match="top_n must be a positive integer"): TokenCounter(top_n=-1) with pytest.raises(ValueError, match="top_n must be a positive integer"): TokenCounter(top_n="10") ``` ### 2.12 辅助函数测试 #### 2.12.1 test_dyn_batch_wrapper_basic **测试目标**:验证动态批处理基本功能 **前置条件**:无 **测试步骤**: 1. 创建变长序列数据 2. 调用 `dyn_batch_wrapper(*cols, divisor=16, remainder=1)` **预期结果**: - 所有序列填充到统一长度 - 长度满足 `((max_len - remainder - 1) // divisor + 1) * divisor + remainder` **测试代码示例**: ```python def test_dyn_batch_wrapper_basic(): from mindformers.dataset.llm_dataset import dyn_batch_wrapper # Create variable length sequences col1 = [np.array([1, 2, 3]), np.array([1, 2, 3, 4, 5])] col2 = [np.array([0, 0, 0]), np.array([0, 0, 0, 0, 0])] dummy = np.array([0]) # Last column is dummy result = dyn_batch_wrapper(col1, col2, dummy, divisor=16, remainder=1, pad_token_id=[0, -100]) # Verify all sequences have same length assert len(result[0][0]) == len(result[0][1]) assert len(result[1][0]) == len(result[1][1]) ``` #### 2.12.2 test_dyn_batch_wrapper_invalid_pad_token_id **测试目标**:验证 pad_token_id 类型错误的异常处理 **前置条件**:无 **测试步骤**: 1. 传入非法类型的 pad_token_id(如字符串) 2. 调用 `dyn_batch_wrapper()` **预期结果**: - 抛出 `ValueError`,错误信息包含 "pad_token_id should be list or int" **测试代码示例**: ```python def test_dyn_batch_wrapper_invalid_pad_token_id(): from mindformers.dataset.llm_dataset import dyn_batch_wrapper col1 = [np.array([1, 2, 3])] dummy = np.array([0]) with pytest.raises(ValueError, match="pad_token_id should be list or int"): dyn_batch_wrapper(col1, dummy, divisor=16, remainder=1, pad_token_id="invalid") ``` #### 2.12.3 test_asl_batch_wrapper_basic **测试目标**:验证 ASL 批处理基本功能 **前置条件**:无 **测试步骤**: 1. 创建序列长度数据 2. 调用 `asl_batch_wrapper(*cols, micro_batch_num=2)` **预期结果**: - 各微批次的序列长度偏移正确计算 **测试代码示例**: ```python def test_asl_batch_wrapper_basic(): from mindformers.dataset.llm_dataset import asl_batch_wrapper # Create actual sequence lengths actual_seq_len = [np.array([10, 20]), np.array([10, 20]), np.array([10, 20]), np.array([10, 20])] dummy = np.array([0]) result = asl_batch_wrapper(np.array([1]), np.array([2]), actual_seq_len, dummy, micro_batch_num=2) # Verify offsets are applied assert result is not None ``` #### 2.12.4 test_get_input_data_batch_slice_map **测试目标**:验证 EOD 重置处理 **前置条件**:无 **测试步骤**: 1. 创建包含 EOD token 的 input_ids 2. 调用 `get_input_data_batch_slice_map()` **预期结果**: - position_ids 在 EOD 后重置 - attention_mask 在 EOD 后正确屏蔽 **测试代码示例**: ```python def test_get_input_data_batch_slice_map(): from mindformers.dataset.llm_dataset import get_input_data_batch_slice_map # input_ids with EOD token (id=0) at position 3 input_ids = np.array([[1, 2, 3, 0, 4, 5, 6, 7]]) # 1 batch, 8 tokens batch_input_ids, position_ids, attention_mask = get_input_data_batch_slice_map( input_ids, eod_token_id=0, slice_size=1, rank_id=0 ) # Verify position_ids reset after EOD # Position 4 (after EOD at 3) should start from 0 assert position_ids[0, 4] == 0 # Verify attention_mask blocks attention to tokens before EOD assert attention_mask[0, 4, 3] == 0 # Token 4 shouldn't attend to token 3 (EOD) ``` --- ## 3. 集成测试设计 ### 3.1 配置到数据加载器集成测试 #### 3.1.1 test_config_to_data_loader_integration_hf **测试目标**:验证 HFDataLoader 配置到数据加载器创建的完整流程 **测试步骤**: 1. 创建完整的 HFDataLoader 配置 2. 创建 LLMDataset 实例 3. 调用 `get_default_input_columns()` 4. 调用 `generate_shard_info()` 5. 调用 `create_data_loader()` **预期结果**: - 数据加载器正确创建 - 列名正确设置 - 分片信息正确 #### 3.1.2 test_config_to_data_loader_integration_megatron **测试目标**:验证 Megatron 配置到数据加载器创建的完整流程 **测试步骤**: 1. 创建完整的 BlendedMegatronDatasetDataLoader 配置 2. 创建 LLMDataset 实例 3. 调用 `get_default_input_columns()` 4. 调用 `generate_shard_info()` 5. 调用 `create_data_loader()` **预期结果**: - 数据加载器正确创建 - 列名正确设置 - 分片信息正确 ### 3.2 数据管道集成测试 #### 3.2.1 test_data_pipeline_with_dynamic_batch **测试目标**:验证动态批处理数据管道 **测试步骤**: 1. 创建数据加载器 2. 调用 `create_dataset()` 启用 dynamic_batch 3. 迭代数据集验证批次 **预期结果**: - 批次长度正确对齐 - 填充值正确 #### 3.2.2 test_data_pipeline_with_token_profile **测试目标**:验证 Token 统计数据管道 **测试步骤**: 1. 创建数据加载器 2. 调用 `create_dataset()` 启用 use_llm_token_profile 3. 迭代数据集 **预期结果**: - Token 统计 CSV 文件正确生成 - 统计数据准确 ### 3.3 处理器链集成测试 #### 3.3.1 test_handler_chain_integration **测试目标**:验证多个处理器链式调用 **测试步骤**: 1. 配置 AlpacaInstructDataHandler + PackingHandler 2. 创建数据加载器 3. 验证处理结果 **预期结果**: - 处理器按顺序执行 - 数据格式正确 --- ## 4. 系统测试设计 ### 4.1 端到端数据加载测试 #### 4.1.1 test_e2e_hf_dataset_loading **测试目标**:验证 HuggingFace 数据集端到端加载 **测试环境**: - CPU 环境 **测试步骤**: 1. 准备小规模测试数据 2. 配置 HFDataLoader 3. 创建 LLMDataset 并创建数据管道 4. 迭代验证数据 **预期结果**: - 数据正确加载 - 批次大小正确 - 数据类型正确 **验证点**: - 日志输出 "Data loader created successfully" - 数据形状符合配置 - 列名正确 #### 4.1.2 test_e2e_mindrecord_dataset_loading **测试目标**:验证 MindRecord 数据集端到端加载 **测试环境**: - CPU 环境 **测试步骤**: 1. 准备 MindRecord 测试文件 2. 配置 MindDataset 3. 创建 LLMDataset 并创建数据管道 4. 迭代验证数据 **预期结果**: - 数据正确加载 - 批次大小正确 - EOD 重置正确(如启用) #### 4.1.3 test_e2e_megatron_dataset_loading **测试目标**:验证 Megatron 数据集端到端加载 **测试环境**: - CPU 环境 - dataset_broadcast_opt_level >= 3 **测试步骤**: 1. 准备 Megatron 格式测试数据 2. 配置 BlendedMegatronDatasetDataLoader 3. 创建 LLMDataset 并创建数据管道 4. 迭代验证数据 **预期结果**: - 数据正确加载 - 批次大小正确 - 列数据正确 ### 4.2 分布式数据加载测试 #### 4.2.1 test_distributed_data_loading_dp **测试目标**:验证数据并行模式下的数据分片 **测试环境**: - 多卡(如 8 卡) **测试步骤**: 1. 配置 DP=8 2. 各卡创建 LLMDataset 3. 验证各卡获取的数据分片 **预期结果**: - 各卡获取不同的数据分片 - 分片无重叠 - 数据完整覆盖 #### 4.2.2 test_distributed_data_loading_semi_full_batch **测试目标**:验证 Semi Full Batch 模式下的数据加载 **测试环境**: - 多卡 **测试步骤**: 1. 配置 semi_auto_parallel + full_batch 2. 各卡创建 LLMDataset 3. 验证数据分片策略 **预期结果**: - shard_id 和 num_shards 为 None - 框架自动处理数据分发 ### 4.3 动态批处理测试 #### 4.3.1 test_dynamic_batch_variable_length **测试目标**:验证变长序列的动态批处理 **测试步骤**: 1. 准备变长序列数据 2. 配置 dynamic_batch=True, divisor=16, remainder=1 3. 创建数据管道 4. 验证批次 **预期结果**: - 批次内序列长度统一 - 长度满足对齐要求 - 填充值正确 ### 4.4 Token 统计测试 #### 4.4.1 test_token_counting_csv_output **测试目标**:验证 Token 统计 CSV 输出 **测试步骤**: 1. 配置 use_llm_token_profile=True 2. 创建数据管道并迭代 3. 检查 CSV 文件 **预期结果**: - CSV 文件正确生成 - 包含 step_num, min_id, max_id 等列 - Top N token 统计正确 #### 4.4.2 test_token_counting_multi_rank **测试目标**:验证多 rank Token 统计 **测试环境**: - 多卡 **测试步骤**: 1. 各卡配置 Token 统计 2. 迭代数据 3. 检查各 rank 的 CSV 文件 **预期结果**: - 各 rank 生成独立的 CSV 文件 - 文件名包含 rank_id ### 4.5 EOD 重置测试 #### 4.5.1 test_eod_reset_position_ids **测试目标**:验证 EOD 重置 position_ids **测试步骤**: 1. 准备包含 EOD token 的数据 2. 配置 eod_reset=True 3. 创建 MindDataset 数据管道 4. 验证 position_ids **预期结果**: - EOD 后的 position_ids 从 0 开始 - 位置编码正确重置 #### 4.5.2 test_eod_reset_attention_mask **测试目标**:验证 EOD 重置 attention_mask **测试步骤**: 1. 准备包含 EOD token 的数据 2. 配置 eod_reset=True 3. 创建 MindDataset 数据管道 4. 验证 attention_mask **预期结果**: - EOD 后的 token 不注意 EOD 前的 token - 掩码矩阵正确 ### 4.6 压缩 EOD 掩码测试 #### 4.6.1 test_compressed_eod_mask_with_micro_batch **测试目标**:验证压缩 EOD 掩码与微批次配合 **测试步骤**: 1. 配置 create_compressed_eod_mask=True 2. 配置 micro_batch_num=4 3. 创建数据管道 4. 验证 actual_seq_len 偏移 **预期结果**: - actual_seq_len 列正确生成 - 微批次间偏移正确计算 --- ## 5. 性能测试设计 ### 5.1 数据加载吞吐量测试 #### 5.1.1 test_data_loading_throughput **测试目标**:测量数据加载吞吐量 **测试指标**: - samples/second - batches/second **测试方法**: 1. 固定配置运行数据加载 2. 记录完成时间和样本数 3. 计算吞吐量 ### 5.2 并行工作线程测试 #### 5.2.1 test_parallel_workers_performance **测试目标**:验证 num_parallel_workers 对性能的影响 **测试方法**: 1. 分别测试 1, 2, 4, 8 个并行工作线程 2. 记录数据加载时间 3. 对比性能差异 **预期结果**: - 适当增加工作线程可提升性能 - 存在最佳工作线程数 ### 5.3 预取性能测试 #### 5.3.1 test_prefetch_size_performance **测试目标**:验证 prefetch_size 对性能的影响 **测试方法**: 1. 分别测试 prefetch_size = 1, 2, 4, 8 2. 记录数据加载时间 3. 对比性能差异 --- ## 6. 异常场景测试设计 ### 6.1 配置异常测试 | 测试场景 | 异常配置 | 预期行为 | | ------ | ------ | ------ | | 空配置 | dataset_config = None | ValueError | | data_loader 类型错误 | data_loader = "string" | ValueError | | 缺少 type | data_loader = {} | ValueError | | HF broadcast 未设 opt_level | use_broadcast_data=True, opt_level<3 | ValueError | | Megatron 未设 opt_level | opt_level < 3 | ValueError | ### 6.2 参数异常测试 | 测试场景 | 异常参数 | 预期行为 | | ------ | ------ | ------ | | column_names 为 None | create_data_loader(None) | ValueError | | pad_token_id 类型错误 | pad_token_id="0" | ValueError | | top_n 非正整数 | top_n=0 或 top_n=-1 | ValueError | | eod_reset 非 MindDataset | eod_reset=True, type=HFDataLoader | ValueError | | dynamic_batch 与 compressed_eod_mask 同时使用 | 两者同时为 True | ValueError | ### 6.3 文件异常测试 | 测试场景 | 异常情况 | 预期行为 | | ------ | ------ | ------ | | MindRecord 文件不存在 | dataset_dir 路径无效 | FileNotFoundError | | 数据目录为空 | 目录存在但无文件 | FileNotFoundError | | 无写入权限 | Token 统计输出目录无权限 | IOError | ### 6.4 运行时异常测试 | 测试场景 | 异常情况 | 预期行为 | | ------ | ------ | ------ | | 数据集为空 | 迭代空数据集 | 正常结束,无数据 | | 批次大小不整除 | batch_size % num_shards != 0 | ValueError | | eod_token_id 未设置 | MindDataset EOD reset 无 eod_token_id | ValueError | --- ## 7. 测试覆盖率要求 | 覆盖类型 | 目标覆盖率 | | ------ | ------ | | 行覆盖率 | >= 85% | | 分支覆盖率 | >= 80% | | 函数覆盖率 | >= 90% | ### 7.1 关键路径覆盖 必须覆盖的关键路径: 1. LLMDataset 初始化流程:各种数据加载器类型的初始化 2. 数据加载器创建:MindDataset、HFDataLoader、MegatronDataLoader 3. 数据集创建:批处理、类型转换、Token 统计 4. 分片信息生成:各种并行模式 5. 动态批处理:变长序列填充 6. EOD 重置:position_ids 和 attention_mask 重置 7. 异常处理路径:所有异常分支 --- ## 8. 测试工具和辅助函数 ### 8.1 Fixture 函数 ```python @pytest.fixture(scope="module") def setup_context(): """Setup MindSpore context for testing.""" context.set_context(mode=context.GRAPH_MODE, device_target="CPU") # Set dataset_broadcast_opt_level to 3 for Megatron dataset tests context.set_context(dataset_broadcast_opt_level=3) ms.set_seed(42) yield # Reset context after tests context.reset_auto_parallel_context() ``` ### 8.2 配置生成辅助函数 ```python def create_hf_data_loader_config(**kwargs): """创建 HFDataLoader 测试配置""" default_config = { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } default_config.update(kwargs) return {"data_loader": default_config} def create_megatron_data_loader_config(**kwargs): """创建 Megatron 测试配置""" inner_config = DictConfig(**{ "seq_length": 8192, "data_path": ['1.0', '/path/data'] }) inner_config.update(kwargs.pop("config", {})) data_loader = DictConfig(**{ "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": inner_config }) return {"data_loader": data_loader} def create_minddataset_config(**kwargs): """创建 MindDataset 测试配置""" default_config = { "type": "MindDataset", "dataset_dir": "/path/to/mindrecord" } default_config.update(kwargs) return {"data_loader": default_config} ``` ### 8.3 数据生成辅助函数 ```python def create_mock_variable_length_data(batch_size, max_length): """创建变长序列模拟数据""" data = [] for _ in range(batch_size): length = np.random.randint(1, max_length + 1) data.append(np.random.randint(0, 10000, size=length)) return data def create_mock_dataset_with_eod(batch_size, seq_length, eod_token_id=0): """创建包含 EOD token 的模拟数据""" data = [] for _ in range(batch_size): tokens = np.random.randint(1, 10000, size=seq_length) # 随机插入 EOD token eod_positions = np.random.choice(seq_length // 2, size=2, replace=False) tokens[eod_positions] = eod_token_id data.append(tokens) return np.array(data) ``` ### 8.4 验证辅助函数 ```python def verify_batch_shape(batch, expected_shape): """验证批次形状""" assert batch.shape == expected_shape, f"Expected {expected_shape}, got {batch.shape}" def verify_padding_values(batch, pad_value, original_lengths): """验证填充值""" for i, length in enumerate(original_lengths): padded = batch[i, length:] assert np.all(padded == pad_value), f"Padding values incorrect at sample {i}" def verify_position_ids_reset(position_ids, eod_positions): """验证 position_ids 在 EOD 后重置""" for eod_pos in eod_positions: if eod_pos + 1 < len(position_ids): assert position_ids[eod_pos + 1] == 0, f"Position not reset after EOD at {eod_pos}" def verify_csv_output(csv_path, expected_columns): """验证 CSV 输出""" import csv assert os.path.exists(csv_path), f"CSV file not found: {csv_path}" with open(csv_path, 'r') as f: reader = csv.reader(f) header = next(reader) for col in expected_columns: assert col in header, f"Column {col} not found in CSV" ``` --- ## 9. 测试执行计划 ### 9.1 测试优先级 | 优先级 | 测试类型 | 说明 | | ------ | ------ | ------ | | P0 | 初始化测试 | 必须通过的基础功能测试 | | P0 | 配置方法测试 | 核心配置功能测试 | | P0 | 列名获取测试 | 数据管道必需功能 | | P1 | 数据加载器创建测试 | 主要功能的集成验证 | | P1 | 掩码配置检查测试 | 注意力掩码和 EOD 掩码 | | P1 | 端到端数据加载测试 | 完整数据加载流程 | | P2 | 分布式数据加载测试 | 多卡场景验证 | | P2 | 动态批处理测试 | 变长序列处理 | | P2 | Token 统计测试 | 辅助功能验证 | | P3 | 性能测试 | 性能指标验证 | ### 9.2 测试频率 | 测试类型 | 执行频率 | | ------ | ------ | | 单元测试 | 每次提交 | | 集成测试 | 每日构建 | | 单卡系统测试 | 每日构建 | | 分布式系统测试 | 每周/版本发布前 | | 性能测试 | 版本发布前 | ### 9.3 测试用例统计 | 测试类别 | 用例数量 | | ------ | ------ | | 初始化测试 | 7 | | 配置方法测试 | 3 | | 列名获取测试 | 4 | | 分片信息生成测试 | 2 | | 数据加载器创建测试 | 1 | | 掩码配置检查测试 | 4 | | 数据加载器类型获取测试 | 3 | | Token 统计功能测试 | 2 | | 并行模式检查测试 | 3 | | YAML 配置测试 | 2 | | TokenCounter 类测试 | 2 | | 辅助函数测试 | 4 | | 集成测试 | 5 | | 系统测试 | 10 | | 性能测试 | 3 | | 异常场景测试 | 15+ | | **总计** | **60+** | --- ## 10. 测试代码文件结构 ``` tests/st/test_ut/test_dataset/test_llm_dataset/ ├── __init__.py ├── test_llm_dataset.py # 现有单元测试 ├── test_llm_dataset_init.py # 初始化测试 ├── test_llm_dataset_config.py # 配置方法测试 ├── test_llm_dataset_columns.py # 列名获取测试 ├── test_llm_dataset_shard.py # 分片信息测试 ├── test_llm_dataset_mask.py # 掩码配置测试 ├── test_llm_dataset_parallel.py # 并行模式测试 ├── test_token_counter.py # TokenCounter 测试 ├── test_batch_wrappers.py # 批处理辅助函数测试 ├── conftest.py # 共享 fixture └── integration/ ├── test_data_pipeline.py # 数据管道集成测试 ├── test_handler_chain.py # 处理器链测试 └── test_e2e_loading.py # 端到端加载测试 ```
# MindSpore Transformers LLMDataset DT测试设计说明书 大语言模型数据集(LLMDataset)DT测试设计说明书 **修订记录Version Control** | 日期Date | 修订版本Version | 描述Description | 作者Prepared by | |:------:|:------|:------|:------:| | 2025-11-11 | V1.0 | 初版 | - | 本文档描述 MindSpore Transformers 中 LLMDataset 模块的 DT(Development Test)测试设计,包括单元测试、集成测试和系统测试的详细设计。 --- ## 1. 测试概述 ### 1.1 测试目标 - 验证 LLMDataset 各功能模块的正确性和健壮性 - 确保多种数据加载器类型(MindDataset、HFDataLoader、MegatronDataLoader)正常工作 - 验证分布式场景下数据分片的正确性 - 确保动态批处理、EOD 重置、注意力掩码生成等功能正常 - 验证 Token 统计分析功能的准确性 - 验证异常场景的错误处理和提示信息 ### 1.2 测试范围 | 测试类型 | 测试范围 | | ------ | ------ | | 单元测试(UT) | 核心方法的功能验证、边界条件测试、异常处理测试 | | 集成测试(IT) | 模块间交互验证、配置解析流程、数据管道验证 | | 系统测试(ST) | 端到端数据加载流程、分布式场景、性能验证 | ### 1.3 测试环境 | 环境要素 | 要求 | | ------ | ------ | | 框架版本 | MindSpore >= 2.3.0 | | 硬件环境 | CPU / Ascend 910B(用于分布式测试) | | Python版本 | Python >= 3.8 | | 依赖库 | numpy, transformers (可选) | --- ## 2. 单元测试设计 ### 2.1 LLMDataset 初始化测试 #### 2.1.1 test_init_with_none_config **测试目标**:验证 dataset_config 为 None 时的异常处理 **前置条件**:无 **测试步骤**: 1. 调用 `LLMDataset(dataset_config=None)` **预期结果**: - 抛出 `ValueError`,错误信息包含 "dataset_config cannot be None" **测试代码示例**: ```python def test_init_with_none_config(): with pytest.raises(ValueError, match="dataset_config cannot be None"): LLMDataset(dataset_config=None) ``` #### 2.1.2 test_init_with_invalid_data_loader_config **测试目标**:验证 data_loader 非字典类型时的异常处理 **前置条件**:无 **测试步骤**: 1. 创建配置 `{"data_loader": "invalid_string"}` 2. 调用 `LLMDataset(dataset_config=invalid_config)` **预期结果**: - 抛出 `ValueError`,错误信息包含 "data_loader_config must be a dict" **测试代码示例**: ```python def test_init_with_invalid_data_loader_config(): invalid_config = { "data_loader": "invalid_string" # Should be a dict } with pytest.raises(ValueError, match="data_loader_config must be a dict"): LLMDataset(dataset_config=invalid_config) ``` #### 2.1.3 test_init_with_missing_type **测试目标**:验证缺少 type 字段时的异常处理 **前置条件**:无 **测试步骤**: 1. 创建缺少 type 的配置 2. 调用 `LLMDataset(dataset_config=invalid_config)` **预期结果**: - 抛出 `ValueError`,错误信息包含 "data_loader_config must contain 'type' key" **测试代码示例**: ```python def test_init_with_missing_type(): invalid_config = { "data_loader": { "config": {} } } with pytest.raises(ValueError, match="data_loader_config must contain 'type' key"): LLMDataset(dataset_config=invalid_config) ``` #### 2.1.4 test_init_with_valid_megatron_config **测试目标**:验证有效 Megatron 配置的初始化 **前置条件**: - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建有效的 BlendedMegatronDatasetDataLoader 配置 2. 调用 `LLMDataset(dataset_config=config)` 3. 验证 data_loader_type 属性 **预期结果**: - 初始化成功 - `data_loader_type == "BlendedMegatronDatasetDataLoader"` **测试代码示例**: ```python def test_init_with_valid_megatron_config(setup_context): valid_config = { "data_loader": { "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": { "seq_length": 8192, "eod": 0, "pad": 1, "data_path": ['0.5', '/path/data1', '0.5', '/path/data2'] } }, "input_columns": ["input_ids", "labels", "loss_mask", "position_ids"], "drop_remainder": True } dataset = LLMDataset(dataset_config=valid_config) assert dataset.data_loader_type == "BlendedMegatronDatasetDataLoader" assert dataset.data_loader_config["type"] == "BlendedMegatronDatasetDataLoader" ``` #### 2.1.5 test_init_with_valid_hf_config **测试目标**:验证有效 HuggingFace 配置的初始化 **前置条件**:无 **测试步骤**: 1. 创建有效的 HFDataLoader 配置(use_broadcast_data=False) 2. 调用 `LLMDataset(dataset_config=config)` 3. 验证 data_loader_type 属性 **预期结果**: - 初始化成功 - `data_loader_type == "HFDataLoader"` **测试代码示例**: ```python def test_init_with_valid_hf_config(): valid_config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test_dataset", "use_broadcast_data": False, "create_attention_mask": True, "handler": [] }, "input_columns": ["input_ids", "labels"], "drop_remainder": True } dataset = LLMDataset(dataset_config=valid_config) assert dataset.data_loader_type == "HFDataLoader" assert dataset.data_loader_config["type"] == "HFDataLoader" ``` #### 2.1.6 test_init_hf_with_broadcast_no_opt_level **测试目标**:验证 HFDataLoader 启用 broadcast 但未设置 opt_level 的异常处理 **前置条件**: - dataset_broadcast_opt_level < 3 **测试步骤**: 1. 创建 HFDataLoader 配置,use_broadcast_data=True 2. 调用 `LLMDataset(dataset_config=config)` **预期结果**: - 抛出 `ValueError`,错误信息包含 "please set `dataset_broadcast_opt_level: 3`" **测试代码示例**: ```python def test_init_hf_with_broadcast_no_opt_level(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": True # Requires opt_level >= 3 } } # Assuming opt_level < 3 with pytest.raises(ValueError, match="please set `dataset_broadcast_opt_level: 3`"): LLMDataset(dataset_config=config) ``` #### 2.1.7 test_init_with_packing_handler **测试目标**:验证 PackingHandler 自动启用 attention_mask **前置条件**:无 **测试步骤**: 1. 创建包含 PackingHandler 的配置 2. 调用 `LLMDataset(dataset_config=config)` 3. 验证 create_attention_mask 被自动设置为 True **预期结果**: - `data_loader_config.create_attention_mask == True` **测试代码示例**: ```python def test_init_with_packing_handler(): data_loader_config = DictConfig(**{ "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False, "handler": [ {"type": "PackingHandler", "seq_length": 4096} ] }) config = {"data_loader": data_loader_config} dataset = LLMDataset(dataset_config=config) assert dataset.data_loader_config.create_attention_mask == True ``` ### 2.2 配置方法测试 #### 2.2.1 test_set_megatron_dataset_config_seed **测试目标**:验证 Megatron 数据集种子设置 **前置条件**: - 使用 DictConfig 创建嵌套配置 - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建 BlendedMegatronDatasetDataLoader 配置 2. 调用 `set_megatron_dataset_config_seed(dataset_seed=5678)` 3. 验证 seed 值 **预期结果**: - `data_loader_config.config.seed == 5678` **测试代码示例**: ```python def test_set_megatron_dataset_config_seed(setup_context): inner_config = DictConfig(**{ "seq_length": 8192, "seed": 1234, "data_path": ['1.0', '/path/data'] }) data_loader = DictConfig(**{ "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": inner_config }) config = {"data_loader": data_loader} dataset = LLMDataset(dataset_config=config) dataset.set_megatron_dataset_config_seed(dataset_seed=5678) assert dataset.data_loader_config.config.seed == 5678 ``` #### 2.2.2 test_set_megatron_dataset_config_seed_non_megatron **测试目标**:验证非 Megatron 数据集调用种子设置不报错 **前置条件**:无 **测试步骤**: 1. 创建 HFDataLoader 配置 2. 调用 `set_megatron_dataset_config_seed(dataset_seed=5678)` **预期结果**: - 方法执行不抛出异常 - 配置不变化 **测试代码示例**: ```python def test_set_megatron_dataset_config_seed_non_megatron(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test_dataset", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) # Should not raise error even though it's not Megatron dataset dataset.set_megatron_dataset_config_seed(dataset_seed=5678) ``` #### 2.2.3 test_set_ms_dataset_config **测试目标**:验证 MindSpore 数据集全局配置设置 **前置条件**:无 **测试步骤**: 1. 创建 LLMDataset 实例 2. 调用 `set_ms_dataset_config(dataset_seed=4321, prefetch_size=2, numa_enable=True)` **预期结果**: - MindSpore 数据集配置被正确设置 - 不抛出异常 **测试代码示例**: ```python def test_set_ms_dataset_config(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test_dataset", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) dataset.set_ms_dataset_config( dataset_seed=4321, prefetch_size=2, numa_enable=True ) # Verify configuration is set (no exceptions raised) assert True ``` ### 2.3 列名获取测试 #### 2.3.1 test_get_default_input_columns_compressed_eod **测试目标**:验证压缩 EOD 掩码列名生成 **前置条件**:无 **测试步骤**: 1. 创建 LLMDataset 实例 2. 调用 `get_default_input_columns(create_compressed_eod_mask=True)` **预期结果**: - 返回 `['input_ids', 'labels', 'loss_mask', 'position_ids', 'actual_seq_len']` **测试代码示例**: ```python def test_get_default_input_columns_compressed_eod(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) columns = dataset.get_default_input_columns( create_attention_mask=False, create_compressed_eod_mask=True ) assert columns == ['input_ids', 'labels', 'loss_mask', 'position_ids', 'actual_seq_len'] ``` #### 2.3.2 test_get_default_input_columns_attention_mask **测试目标**:验证注意力掩码列名生成 **前置条件**:无 **测试步骤**: 1. 创建 LLMDataset 实例 2. 调用 `get_default_input_columns(create_attention_mask=True)` **预期结果**: - 返回 `['input_ids', 'labels', 'loss_mask', 'position_ids', 'attention_mask']` **测试代码示例**: ```python def test_get_default_input_columns_attention_mask(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) columns = dataset.get_default_input_columns( create_attention_mask=True, create_compressed_eod_mask=False ) assert columns == ['input_ids', 'labels', 'loss_mask', 'position_ids', 'attention_mask'] ``` #### 2.3.3 test_get_default_input_columns_megatron **测试目标**:验证 Megatron 数据集默认列名 **前置条件**: - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建 BlendedMegatronDatasetDataLoader 配置 2. 调用 `get_default_input_columns()` **预期结果**: - 返回 `['input_ids', 'labels', 'loss_mask', 'position_ids']` **测试代码示例**: ```python def test_get_default_input_columns_megatron(setup_context): config = { "data_loader": { "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": { "seq_length": 8192, "data_path": ['1.0', '/path/data'] } } } dataset = LLMDataset(dataset_config=config) columns = dataset.get_default_input_columns( create_attention_mask=False, create_compressed_eod_mask=False ) assert columns == ['input_ids', 'labels', 'loss_mask', 'position_ids'] ``` #### 2.3.4 test_get_default_input_columns_custom **测试目标**:验证自定义列名返回 **前置条件**:无 **测试步骤**: 1. 创建包含 input_columns 的配置 2. 调用 `get_default_input_columns()` **预期结果**: - 返回配置中指定的 input_columns **测试代码示例**: ```python def test_get_default_input_columns_custom(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False }, "input_columns": ["custom_col1", "custom_col2"] } dataset = LLMDataset(dataset_config=config) columns = dataset.get_default_input_columns( create_attention_mask=False, create_compressed_eod_mask=False ) assert columns == ["custom_col1", "custom_col2"] ``` ### 2.4 分片信息生成测试 #### 2.4.1 test_generate_shard_info_default **测试目标**:验证默认分片信息生成 **前置条件**: - 单设备模式 **测试步骤**: 1. 创建 LLMDataset 实例 2. 调用 `generate_shard_info(data_parallel_size=1)` **预期结果**: - `shard_id == 0` - `num_shards == 1` **测试代码示例**: ```python def test_generate_shard_info_default(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) shard_id, num_shards = dataset.generate_shard_info(data_parallel_size=1) assert shard_id == 0 assert num_shards == 1 ``` #### 2.4.2 test_generate_shard_info_semi_full_batch **测试目标**:验证 Semi Full Batch 模式分片信息 **前置条件**: - 设置 semi_auto_parallel 和 full_batch=True **测试步骤**: 1. 配置 semi_auto_parallel 和 full_batch 2. 调用 `generate_shard_info()` **预期结果**: - `shard_id == None` - `num_shards == None` ### 2.5 数据加载器创建测试 #### 2.5.1 test_create_data_loader_with_none_columns **测试目标**:验证 column_names 为 None 时的异常处理 **前置条件**:无 **测试步骤**: 1. 创建 LLMDataset 实例 2. 调用 `create_data_loader(column_names=None)` **预期结果**: - 抛出 `ValueError`,错误信息包含 "column_names cannot be None" **测试代码示例**: ```python def test_create_data_loader_with_none_columns(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) with pytest.raises(ValueError, match="column_names cannot be None"): dataset.create_data_loader(column_names=None) ``` ### 2.6 掩码配置检查测试 #### 2.6.1 test_is_create_compressed_eod_mask_megatron **测试目标**:验证 Megatron 数据集压缩 EOD 掩码检查 **前置条件**: - 使用 DictConfig 配置 - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建配置 `create_compressed_eod_mask=True` 2. 调用 `is_create_compressed_eod_mask()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_create_compressed_eod_mask_megatron(setup_context): inner_config = DictConfig(**{ "seq_length": 8192, "create_compressed_eod_mask": True, "data_path": ['1.0', '/path/data'] }) data_loader = DictConfig(**{ "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": inner_config }) config = {"data_loader": data_loader} dataset = LLMDataset(dataset_config=config) assert dataset.is_create_compressed_eod_mask() ``` #### 2.6.2 test_is_create_compressed_eod_mask_hf **测试目标**:验证 HF 数据集压缩 EOD 掩码检查 **前置条件**:无 **测试步骤**: 1. 创建配置 `create_compressed_eod_mask=True` 2. 调用 `is_create_compressed_eod_mask()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_create_compressed_eod_mask_hf(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False, "create_compressed_eod_mask": True } } dataset = LLMDataset(dataset_config=config) assert dataset.is_create_compressed_eod_mask() ``` #### 2.6.3 test_is_create_attention_mask_megatron **测试目标**:验证 Megatron 数据集注意力掩码检查 **前置条件**: - 使用 DictConfig 配置 - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建配置 `create_attention_mask=True` 2. 调用 `is_create_attention_mask()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_create_attention_mask_megatron(setup_context): inner_config = DictConfig(**{ "seq_length": 8192, "create_attention_mask": True, "data_path": ['1.0', '/path/data'] }) data_loader = DictConfig(**{ "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": inner_config }) config = {"data_loader": data_loader} dataset = LLMDataset(dataset_config=config) assert dataset.is_create_attention_mask() ``` #### 2.6.4 test_is_create_attention_mask_hf **测试目标**:验证 HF 数据集注意力掩码检查 **前置条件**:无 **测试步骤**: 1. 创建配置 `create_attention_mask=True` 2. 调用 `is_create_attention_mask()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_create_attention_mask_hf(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False, "create_attention_mask": True } } dataset = LLMDataset(dataset_config=config) assert dataset.is_create_attention_mask() ``` ### 2.7 数据加载器类型获取测试 #### 2.7.1 test_get_data_loader_type_megatron **测试目标**:验证获取 Megatron 数据加载器类型 **前置条件**: - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建 BlendedMegatronDatasetDataLoader 配置 2. 调用 `get_data_loader_type()` **预期结果**: - 返回 `"BlendedMegatronDatasetDataLoader"` **测试代码示例**: ```python def test_get_data_loader_type_megatron(setup_context): config = { "data_loader": { "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": { "seq_length": 8192, "data_path": ['1.0', '/path/data'] } } } dataset = LLMDataset(dataset_config=config) assert dataset.get_data_loader_type() == "BlendedMegatronDatasetDataLoader" ``` #### 2.7.2 test_get_data_loader_type_hf **测试目标**:验证获取 HF 数据加载器类型 **前置条件**:无 **测试步骤**: 1. 创建 HFDataLoader 配置 2. 调用 `get_data_loader_type()` **预期结果**: - 返回 `"HFDataLoader"` **测试代码示例**: ```python def test_get_data_loader_type_hf(): config = { "data_loader": { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } } dataset = LLMDataset(dataset_config=config) assert dataset.get_data_loader_type() == "HFDataLoader" ``` #### 2.7.3 test_get_data_loader_type_minddataset **测试目标**:验证获取 MindDataset 数据加载器类型 **前置条件**:无 **测试步骤**: 1. 创建 MindDataset 配置 2. 调用 `get_data_loader_type()` **预期结果**: - 返回 `"MindDataset"` **测试代码示例**: ```python def test_get_data_loader_type_minddataset(): config = { "data_loader": { "type": "MindDataset", "dataset_dir": "/path/to/mindrecord" } } dataset = LLMDataset(dataset_config=config) assert dataset.get_data_loader_type() == "MindDataset" ``` ### 2.8 Token 统计功能测试 #### 2.8.1 test_perform_token_counting_default **测试目标**:验证默认参数创建 Token 统计函数 **前置条件**:无 **测试步骤**: 1. 调用 `LLMDataset.perform_token_counting()` **预期结果**: - 返回可调用对象 **测试代码示例**: ```python def test_perform_token_counting_default(): counter_func = LLMDataset.perform_token_counting() assert callable(counter_func) ``` #### 2.8.2 test_perform_token_counting_custom **测试目标**:验证自定义参数创建 Token 统计函数 **前置条件**:无 **测试步骤**: 1. 调用 `LLMDataset.perform_token_counting(top_n=20, min_token_id=10, max_token_id=1000, save_path="./test_output/")` **预期结果**: - 返回可调用对象 **测试代码示例**: ```python def test_perform_token_counting_custom(): counter_func = LLMDataset.perform_token_counting( top_n=20, min_token_id=10, max_token_id=1000, save_path="./test_output/" ) assert callable(counter_func) ``` ### 2.9 并行模式检查测试 #### 2.9.1 test_is_semi_mode **测试目标**:验证半自动并行模式检查 **前置条件**:无 **测试步骤**: 1. 设置 `parallel_mode="semi_auto_parallel"` 2. 调用 `LLMDataset._is_semi()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_semi_mode(): # Set to semi-auto parallel mode context.set_auto_parallel_context(parallel_mode="semi_auto_parallel") is_semi = LLMDataset._is_semi() assert is_semi # Reset to stand-alone mode context.reset_auto_parallel_context() ``` #### 2.9.2 test_is_full_batch_mode **测试目标**:验证全批次模式检查 **前置条件**:无 **测试步骤**: 1. 设置 `full_batch=True` 2. 调用 `LLMDataset._is_full_batch()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_full_batch_mode(): # Set full_batch to True context.set_auto_parallel_context(full_batch=True) is_full = LLMDataset._is_full_batch() assert is_full # Reset context.reset_auto_parallel_context() ``` #### 2.9.3 test_is_data_parallel_mode **测试目标**:验证数据并行模式检查 **前置条件**:无 **测试步骤**: 1. 设置 `parallel_mode="data_parallel"` 2. 调用 `LLMDataset._is_data_parallel()` **预期结果**: - 返回 `True` **测试代码示例**: ```python def test_is_data_parallel_mode(): # Set to data parallel mode context.set_auto_parallel_context(parallel_mode="data_parallel") is_dp = LLMDataset._is_data_parallel() assert is_dp # Reset context.reset_auto_parallel_context() ``` ### 2.10 YAML 配置测试 #### 2.10.1 test_init_with_pretrain_yaml_config **测试目标**:验证预训练 YAML 配置结构 **前置条件**: - 设置 `dataset_broadcast_opt_level: 3` **测试步骤**: 1. 创建模拟 llm_pretrain_template.yaml 的配置结构 2. 创建 LLMDataset 实例 3. 验证各属性 **预期结果**: - `data_loader_type == "BlendedMegatronDatasetDataLoader"` - `is_create_attention_mask() == True` - `is_create_compressed_eod_mask() == False` **测试代码示例**: ```python def test_init_with_pretrain_yaml_config(setup_context): inner_config = DictConfig(**{ "seq_length": 8192, "eod_mask_loss": True, "reset_position_ids": True, "create_attention_mask": True, "reset_attention_mask": True, "create_compressed_eod_mask": False, "eod_pad_length": 128, "eod": 0, "pad": 1, "data_path": ['0.3', "/path/megatron_data1", '0.7', "/path/megatron_data2"] }) data_loader = DictConfig(**{ "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": inner_config }) config = { "data_loader": data_loader, "input_columns": ["input_ids", "labels", "loss_mask", "position_ids", "attention_mask"], "drop_remainder": True } dataset = LLMDataset(dataset_config=config) assert dataset.data_loader_type == "BlendedMegatronDatasetDataLoader" assert dataset.is_create_attention_mask() assert not dataset.is_create_compressed_eod_mask() ``` #### 2.10.2 test_init_with_finetune_yaml_config **测试目标**:验证微调 YAML 配置结构 **前置条件**:无 **测试步骤**: 1. 创建模拟 llm_finetune_template.yaml 的配置结构(包含 PackingHandler) 2. 创建 LLMDataset 实例 3. 验证各属性 **预期结果**: - `data_loader_type == "HFDataLoader"` - `is_create_attention_mask() == True` - `data_loader_config.create_attention_mask == True`(由 PackingHandler 自动启用) **测试代码示例**: ```python def test_init_with_finetune_yaml_config(): data_loader_config = DictConfig(**{ "type": "HFDataLoader", "load_func": "load_dataset", "path": "llm-wizard/alpaca-gpt4-data-zh", "create_attention_mask": True, "create_compressed_eod_mask": False, "use_broadcast_data": False, "shuffle": True, "handler": [ {"type": "AlpacaInstructDataHandler", "seq_length": 4096}, {"type": "PackingHandler", "seq_length": 4096, "pack_strategy": "pack"} ] }) config = { "data_loader": data_loader_config, "input_columns": ["input_ids", "labels", "loss_mask", "position_ids", "attention_mask"], "drop_remainder": True } dataset = LLMDataset(dataset_config=config) assert dataset.data_loader_type == "HFDataLoader" assert dataset.is_create_attention_mask() assert not dataset.is_create_compressed_eod_mask() # Verify PackingHandler enables attention mask assert dataset.data_loader_config.create_attention_mask ``` ### 2.11 TokenCounter 类测试 #### 2.11.1 test_token_counter_init_valid **测试目标**:验证 TokenCounter 有效初始化 **前置条件**:无 **测试步骤**: 1. 创建 `TokenCounter(top_n=10, min_token_id=0, max_token_id=np.inf)` **预期结果**: - 实例创建成功 - 属性正确设置 **测试代码示例**: ```python def test_token_counter_init_valid(): from mindformers.dataset.llm_dataset import TokenCounter counter = TokenCounter(top_n=10, min_token_id=0, max_token_id=np.inf) assert counter.top_n == 10 assert counter.min_token_id == 0 assert counter.max_token_id == np.inf assert counter.step_num == 1 assert counter.initialized == False ``` #### 2.11.2 test_token_counter_init_invalid_top_n **测试目标**:验证 top_n 无效时的异常处理 **前置条件**:无 **测试步骤**: 1. 创建 `TokenCounter(top_n=0)` 2. 创建 `TokenCounter(top_n=-1)` 3. 创建 `TokenCounter(top_n="10")` **预期结果**: - 抛出 `ValueError`,错误信息包含 "top_n must be a positive integer" **测试代码示例**: ```python def test_token_counter_init_invalid_top_n(): from mindformers.dataset.llm_dataset import TokenCounter with pytest.raises(ValueError, match="top_n must be a positive integer"): TokenCounter(top_n=0) with pytest.raises(ValueError, match="top_n must be a positive integer"): TokenCounter(top_n=-1) with pytest.raises(ValueError, match="top_n must be a positive integer"): TokenCounter(top_n="10") ``` ### 2.12 辅助函数测试 #### 2.12.1 test_dyn_batch_wrapper_basic **测试目标**:验证动态批处理基本功能 **前置条件**:无 **测试步骤**: 1. 创建变长序列数据 2. 调用 `dyn_batch_wrapper(*cols, divisor=16, remainder=1)` **预期结果**: - 所有序列填充到统一长度 - 长度满足 `((max_len - remainder - 1) // divisor + 1) * divisor + remainder` **测试代码示例**: ```python def test_dyn_batch_wrapper_basic(): from mindformers.dataset.llm_dataset import dyn_batch_wrapper # Create variable length sequences col1 = [np.array([1, 2, 3]), np.array([1, 2, 3, 4, 5])] col2 = [np.array([0, 0, 0]), np.array([0, 0, 0, 0, 0])] dummy = np.array([0]) # Last column is dummy result = dyn_batch_wrapper(col1, col2, dummy, divisor=16, remainder=1, pad_token_id=[0, -100]) # Verify all sequences have same length assert len(result[0][0]) == len(result[0][1]) assert len(result[1][0]) == len(result[1][1]) ``` #### 2.12.2 test_dyn_batch_wrapper_invalid_pad_token_id **测试目标**:验证 pad_token_id 类型错误的异常处理 **前置条件**:无 **测试步骤**: 1. 传入非法类型的 pad_token_id(如字符串) 2. 调用 `dyn_batch_wrapper()` **预期结果**: - 抛出 `ValueError`,错误信息包含 "pad_token_id should be list or int" **测试代码示例**: ```python def test_dyn_batch_wrapper_invalid_pad_token_id(): from mindformers.dataset.llm_dataset import dyn_batch_wrapper col1 = [np.array([1, 2, 3])] dummy = np.array([0]) with pytest.raises(ValueError, match="pad_token_id should be list or int"): dyn_batch_wrapper(col1, dummy, divisor=16, remainder=1, pad_token_id="invalid") ``` #### 2.12.3 test_asl_batch_wrapper_basic **测试目标**:验证 ASL 批处理基本功能 **前置条件**:无 **测试步骤**: 1. 创建序列长度数据 2. 调用 `asl_batch_wrapper(*cols, micro_batch_num=2)` **预期结果**: - 各微批次的序列长度偏移正确计算 **测试代码示例**: ```python def test_asl_batch_wrapper_basic(): from mindformers.dataset.llm_dataset import asl_batch_wrapper # Create actual sequence lengths actual_seq_len = [np.array([10, 20]), np.array([10, 20]), np.array([10, 20]), np.array([10, 20])] dummy = np.array([0]) result = asl_batch_wrapper(np.array([1]), np.array([2]), actual_seq_len, dummy, micro_batch_num=2) # Verify offsets are applied assert result is not None ``` #### 2.12.4 test_get_input_data_batch_slice_map **测试目标**:验证 EOD 重置处理 **前置条件**:无 **测试步骤**: 1. 创建包含 EOD token 的 input_ids 2. 调用 `get_input_data_batch_slice_map()` **预期结果**: - position_ids 在 EOD 后重置 - attention_mask 在 EOD 后正确屏蔽 **测试代码示例**: ```python def test_get_input_data_batch_slice_map(): from mindformers.dataset.llm_dataset import get_input_data_batch_slice_map # input_ids with EOD token (id=0) at position 3 input_ids = np.array([[1, 2, 3, 0, 4, 5, 6, 7]]) # 1 batch, 8 tokens batch_input_ids, position_ids, attention_mask = get_input_data_batch_slice_map( input_ids, eod_token_id=0, slice_size=1, rank_id=0 ) # Verify position_ids reset after EOD # Position 4 (after EOD at 3) should start from 0 assert position_ids[0, 4] == 0 # Verify attention_mask blocks attention to tokens before EOD assert attention_mask[0, 4, 3] == 0 # Token 4 shouldn't attend to token 3 (EOD) ``` --- ## 3. 集成测试设计 ### 3.1 配置到数据加载器集成测试 #### 3.1.1 test_config_to_data_loader_integration_hf **测试目标**:验证 HFDataLoader 配置到数据加载器创建的完整流程 **测试步骤**: 1. 创建完整的 HFDataLoader 配置 2. 创建 LLMDataset 实例 3. 调用 `get_default_input_columns()` 4. 调用 `generate_shard_info()` 5. 调用 `create_data_loader()` **预期结果**: - 数据加载器正确创建 - 列名正确设置 - 分片信息正确 #### 3.1.2 test_config_to_data_loader_integration_megatron **测试目标**:验证 Megatron 配置到数据加载器创建的完整流程 **测试步骤**: 1. 创建完整的 BlendedMegatronDatasetDataLoader 配置 2. 创建 LLMDataset 实例 3. 调用 `get_default_input_columns()` 4. 调用 `generate_shard_info()` 5. 调用 `create_data_loader()` **预期结果**: - 数据加载器正确创建 - 列名正确设置 - 分片信息正确 ### 3.2 数据管道集成测试 #### 3.2.1 test_data_pipeline_with_dynamic_batch **测试目标**:验证动态批处理数据管道 **测试步骤**: 1. 创建数据加载器 2. 调用 `create_dataset()` 启用 dynamic_batch 3. 迭代数据集验证批次 **预期结果**: - 批次长度正确对齐 - 填充值正确 #### 3.2.2 test_data_pipeline_with_token_profile **测试目标**:验证 Token 统计数据管道 **测试步骤**: 1. 创建数据加载器 2. 调用 `create_dataset()` 启用 use_llm_token_profile 3. 迭代数据集 **预期结果**: - Token 统计 CSV 文件正确生成 - 统计数据准确 ### 3.3 处理器链集成测试 #### 3.3.1 test_handler_chain_integration **测试目标**:验证多个处理器链式调用 **测试步骤**: 1. 配置 AlpacaInstructDataHandler + PackingHandler 2. 创建数据加载器 3. 验证处理结果 **预期结果**: - 处理器按顺序执行 - 数据格式正确 --- ## 4. 系统测试设计 ### 4.1 端到端数据加载测试 #### 4.1.1 test_e2e_hf_dataset_loading **测试目标**:验证 HuggingFace 数据集端到端加载 **测试环境**: - CPU 环境 **测试步骤**: 1. 准备小规模测试数据 2. 配置 HFDataLoader 3. 创建 LLMDataset 并创建数据管道 4. 迭代验证数据 **预期结果**: - 数据正确加载 - 批次大小正确 - 数据类型正确 **验证点**: - 日志输出 "Data loader created successfully" - 数据形状符合配置 - 列名正确 #### 4.1.2 test_e2e_mindrecord_dataset_loading **测试目标**:验证 MindRecord 数据集端到端加载 **测试环境**: - CPU 环境 **测试步骤**: 1. 准备 MindRecord 测试文件 2. 配置 MindDataset 3. 创建 LLMDataset 并创建数据管道 4. 迭代验证数据 **预期结果**: - 数据正确加载 - 批次大小正确 - EOD 重置正确(如启用) #### 4.1.3 test_e2e_megatron_dataset_loading **测试目标**:验证 Megatron 数据集端到端加载 **测试环境**: - CPU 环境 - dataset_broadcast_opt_level >= 3 **测试步骤**: 1. 准备 Megatron 格式测试数据 2. 配置 BlendedMegatronDatasetDataLoader 3. 创建 LLMDataset 并创建数据管道 4. 迭代验证数据 **预期结果**: - 数据正确加载 - 批次大小正确 - 列数据正确 ### 4.2 分布式数据加载测试 #### 4.2.1 test_distributed_data_loading_dp **测试目标**:验证数据并行模式下的数据分片 **测试环境**: - 多卡(如 8 卡) **测试步骤**: 1. 配置 DP=8 2. 各卡创建 LLMDataset 3. 验证各卡获取的数据分片 **预期结果**: - 各卡获取不同的数据分片 - 分片无重叠 - 数据完整覆盖 #### 4.2.2 test_distributed_data_loading_semi_full_batch **测试目标**:验证 Semi Full Batch 模式下的数据加载 **测试环境**: - 多卡 **测试步骤**: 1. 配置 semi_auto_parallel + full_batch 2. 各卡创建 LLMDataset 3. 验证数据分片策略 **预期结果**: - shard_id 和 num_shards 为 None - 框架自动处理数据分发 ### 4.3 动态批处理测试 #### 4.3.1 test_dynamic_batch_variable_length **测试目标**:验证变长序列的动态批处理 **测试步骤**: 1. 准备变长序列数据 2. 配置 dynamic_batch=True, divisor=16, remainder=1 3. 创建数据管道 4. 验证批次 **预期结果**: - 批次内序列长度统一 - 长度满足对齐要求 - 填充值正确 ### 4.4 Token 统计测试 #### 4.4.1 test_token_counting_csv_output **测试目标**:验证 Token 统计 CSV 输出 **测试步骤**: 1. 配置 use_llm_token_profile=True 2. 创建数据管道并迭代 3. 检查 CSV 文件 **预期结果**: - CSV 文件正确生成 - 包含 step_num, min_id, max_id 等列 - Top N token 统计正确 #### 4.4.2 test_token_counting_multi_rank **测试目标**:验证多 rank Token 统计 **测试环境**: - 多卡 **测试步骤**: 1. 各卡配置 Token 统计 2. 迭代数据 3. 检查各 rank 的 CSV 文件 **预期结果**: - 各 rank 生成独立的 CSV 文件 - 文件名包含 rank_id ### 4.5 EOD 重置测试 #### 4.5.1 test_eod_reset_position_ids **测试目标**:验证 EOD 重置 position_ids **测试步骤**: 1. 准备包含 EOD token 的数据 2. 配置 eod_reset=True 3. 创建 MindDataset 数据管道 4. 验证 position_ids **预期结果**: - EOD 后的 position_ids 从 0 开始 - 位置编码正确重置 #### 4.5.2 test_eod_reset_attention_mask **测试目标**:验证 EOD 重置 attention_mask **测试步骤**: 1. 准备包含 EOD token 的数据 2. 配置 eod_reset=True 3. 创建 MindDataset 数据管道 4. 验证 attention_mask **预期结果**: - EOD 后的 token 不注意 EOD 前的 token - 掩码矩阵正确 ### 4.6 压缩 EOD 掩码测试 #### 4.6.1 test_compressed_eod_mask_with_micro_batch **测试目标**:验证压缩 EOD 掩码与微批次配合 **测试步骤**: 1. 配置 create_compressed_eod_mask=True 2. 配置 micro_batch_num=4 3. 创建数据管道 4. 验证 actual_seq_len 偏移 **预期结果**: - actual_seq_len 列正确生成 - 微批次间偏移正确计算 --- ## 5. 性能测试设计 ### 5.1 数据加载吞吐量测试 #### 5.1.1 test_data_loading_throughput **测试目标**:测量数据加载吞吐量 **测试指标**: - samples/second - batches/second **测试方法**: 1. 固定配置运行数据加载 2. 记录完成时间和样本数 3. 计算吞吐量 ### 5.2 并行工作线程测试 #### 5.2.1 test_parallel_workers_performance **测试目标**:验证 num_parallel_workers 对性能的影响 **测试方法**: 1. 分别测试 1, 2, 4, 8 个并行工作线程 2. 记录数据加载时间 3. 对比性能差异 **预期结果**: - 适当增加工作线程可提升性能 - 存在最佳工作线程数 ### 5.3 预取性能测试 #### 5.3.1 test_prefetch_size_performance **测试目标**:验证 prefetch_size 对性能的影响 **测试方法**: 1. 分别测试 prefetch_size = 1, 2, 4, 8 2. 记录数据加载时间 3. 对比性能差异 --- ## 6. 异常场景测试设计 ### 6.1 配置异常测试 | 测试场景 | 异常配置 | 预期行为 | | ------ | ------ | ------ | | 空配置 | dataset_config = None | ValueError | | data_loader 类型错误 | data_loader = "string" | ValueError | | 缺少 type | data_loader = {} | ValueError | | HF broadcast 未设 opt_level | use_broadcast_data=True, opt_level<3 | ValueError | | Megatron 未设 opt_level | opt_level < 3 | ValueError | ### 6.2 参数异常测试 | 测试场景 | 异常参数 | 预期行为 | | ------ | ------ | ------ | | column_names 为 None | create_data_loader(None) | ValueError | | pad_token_id 类型错误 | pad_token_id="0" | ValueError | | top_n 非正整数 | top_n=0 或 top_n=-1 | ValueError | | eod_reset 非 MindDataset | eod_reset=True, type=HFDataLoader | ValueError | | dynamic_batch 与 compressed_eod_mask 同时使用 | 两者同时为 True | ValueError | ### 6.3 文件异常测试 | 测试场景 | 异常情况 | 预期行为 | | ------ | ------ | ------ | | MindRecord 文件不存在 | dataset_dir 路径无效 | FileNotFoundError | | 数据目录为空 | 目录存在但无文件 | FileNotFoundError | | 无写入权限 | Token 统计输出目录无权限 | IOError | ### 6.4 运行时异常测试 | 测试场景 | 异常情况 | 预期行为 | | ------ | ------ | ------ | | 数据集为空 | 迭代空数据集 | 正常结束,无数据 | | 批次大小不整除 | batch_size % num_shards != 0 | ValueError | | eod_token_id 未设置 | MindDataset EOD reset 无 eod_token_id | ValueError | --- ## 7. 测试覆盖率要求 | 覆盖类型 | 目标覆盖率 | | ------ | ------ | | 行覆盖率 | >= 85% | | 分支覆盖率 | >= 80% | | 函数覆盖率 | >= 90% | ### 7.1 关键路径覆盖 必须覆盖的关键路径: 1. LLMDataset 初始化流程:各种数据加载器类型的初始化 2. 数据加载器创建:MindDataset、HFDataLoader、MegatronDataLoader 3. 数据集创建:批处理、类型转换、Token 统计 4. 分片信息生成:各种并行模式 5. 动态批处理:变长序列填充 6. EOD 重置:position_ids 和 attention_mask 重置 7. 异常处理路径:所有异常分支 --- ## 8. 测试工具和辅助函数 ### 8.1 Fixture 函数 ```python @pytest.fixture(scope="module") def setup_context(): """Setup MindSpore context for testing.""" context.set_context(mode=context.GRAPH_MODE, device_target="CPU") # Set dataset_broadcast_opt_level to 3 for Megatron dataset tests context.set_context(dataset_broadcast_opt_level=3) ms.set_seed(42) yield # Reset context after tests context.reset_auto_parallel_context() ``` ### 8.2 配置生成辅助函数 ```python def create_hf_data_loader_config(**kwargs): """创建 HFDataLoader 测试配置""" default_config = { "type": "HFDataLoader", "load_func": "load_dataset", "path": "test", "use_broadcast_data": False } default_config.update(kwargs) return {"data_loader": default_config} def create_megatron_data_loader_config(**kwargs): """创建 Megatron 测试配置""" inner_config = DictConfig(**{ "seq_length": 8192, "data_path": ['1.0', '/path/data'] }) inner_config.update(kwargs.pop("config", {})) data_loader = DictConfig(**{ "type": "BlendedMegatronDatasetDataLoader", "sizes": [1000, 0, 0], "config": inner_config }) return {"data_loader": data_loader} def create_minddataset_config(**kwargs): """创建 MindDataset 测试配置""" default_config = { "type": "MindDataset", "dataset_dir": "/path/to/mindrecord" } default_config.update(kwargs) return {"data_loader": default_config} ``` ### 8.3 数据生成辅助函数 ```python def create_mock_variable_length_data(batch_size, max_length): """创建变长序列模拟数据""" data = [] for _ in range(batch_size): length = np.random.randint(1, max_length + 1) data.append(np.random.randint(0, 10000, size=length)) return data def create_mock_dataset_with_eod(batch_size, seq_length, eod_token_id=0): """创建包含 EOD token 的模拟数据""" data = [] for _ in range(batch_size): tokens = np.random.randint(1, 10000, size=seq_length) # 随机插入 EOD token eod_positions = np.random.choice(seq_length // 2, size=2, replace=False) tokens[eod_positions] = eod_token_id data.append(tokens) return np.array(data) ``` ### 8.4 验证辅助函数 ```python def verify_batch_shape(batch, expected_shape): """验证批次形状""" assert batch.shape == expected_shape, f"Expected {expected_shape}, got {batch.shape}" def verify_padding_values(batch, pad_value, original_lengths): """验证填充值""" for i, length in enumerate(original_lengths): padded = batch[i, length:] assert np.all(padded == pad_value), f"Padding values incorrect at sample {i}" def verify_position_ids_reset(position_ids, eod_positions): """验证 position_ids 在 EOD 后重置""" for eod_pos in eod_positions: if eod_pos + 1 < len(position_ids): assert position_ids[eod_pos + 1] == 0, f"Position not reset after EOD at {eod_pos}" def verify_csv_output(csv_path, expected_columns): """验证 CSV 输出""" import csv assert os.path.exists(csv_path), f"CSV file not found: {csv_path}" with open(csv_path, 'r') as f: reader = csv.reader(f) header = next(reader) for col in expected_columns: assert col in header, f"Column {col} not found in CSV" ``` --- ## 9. 测试执行计划 ### 9.1 测试优先级 | 优先级 | 测试类型 | 说明 | | ------ | ------ | ------ | | P0 | 初始化测试 | 必须通过的基础功能测试 | | P0 | 配置方法测试 | 核心配置功能测试 | | P0 | 列名获取测试 | 数据管道必需功能 | | P1 | 数据加载器创建测试 | 主要功能的集成验证 | | P1 | 掩码配置检查测试 | 注意力掩码和 EOD 掩码 | | P1 | 端到端数据加载测试 | 完整数据加载流程 | | P2 | 分布式数据加载测试 | 多卡场景验证 | | P2 | 动态批处理测试 | 变长序列处理 | | P2 | Token 统计测试 | 辅助功能验证 | | P3 | 性能测试 | 性能指标验证 | ### 9.2 测试频率 | 测试类型 | 执行频率 | | ------ | ------ | | 单元测试 | 每次提交 | | 集成测试 | 每日构建 | | 单卡系统测试 | 每日构建 | | 分布式系统测试 | 每周/版本发布前 | | 性能测试 | 版本发布前 | ### 9.3 测试用例统计 | 测试类别 | 用例数量 | | ------ | ------ | | 初始化测试 | 7 | | 配置方法测试 | 3 | | 列名获取测试 | 4 | | 分片信息生成测试 | 2 | | 数据加载器创建测试 | 1 | | 掩码配置检查测试 | 4 | | 数据加载器类型获取测试 | 3 | | Token 统计功能测试 | 2 | | 并行模式检查测试 | 3 | | YAML 配置测试 | 2 | | TokenCounter 类测试 | 2 | | 辅助函数测试 | 4 | | 集成测试 | 5 | | 系统测试 | 10 | | 性能测试 | 3 | | 异常场景测试 | 15+ | | **总计** | **60+** | --- ## 10. 测试代码文件结构 ``` tests/st/test_ut/test_dataset/test_llm_dataset/ ├── __init__.py ├── test_llm_dataset.py # 现有单元测试 ├── test_llm_dataset_init.py # 初始化测试 ├── test_llm_dataset_config.py # 配置方法测试 ├── test_llm_dataset_columns.py # 列名获取测试 ├── test_llm_dataset_shard.py # 分片信息测试 ├── test_llm_dataset_mask.py # 掩码配置测试 ├── test_llm_dataset_parallel.py # 并行模式测试 ├── test_token_counter.py # TokenCounter 测试 ├── test_batch_wrappers.py # 批处理辅助函数测试 ├── conftest.py # 共享 fixture └── integration/ ├── test_data_pipeline.py # 数据管道集成测试 ├── test_handler_chain.py # 处理器链测试 └── test_e2e_loading.py # 端到端加载测试 ```
评论 (
0
)
登录
后才可以发表评论
状态
TODO
VALIDATION
CLOSED
TODO
ACCEPTED
WIP
DONE
REJECTED
负责人
未设置
标签
未设置
项目
未立项任务
未立项任务
里程碑
未关联里程碑
未关联里程碑
Pull Requests
未关联
未关联
关联的 Pull Requests 被合并后可能会关闭此 issue
分支
未关联
分支 (42)
标签 (23)
master
br_feature_llm_trainer
r1.7.0
r1.6.0
br_infer_boom_1115
bugfix/r1.7.0/value_issue
br_feature_pynative
r1.7.0-beta3
br_feature_infer
r1.7.0-beta1
br_infer_boom
dev
br_infer_deepseek_os
r1.5.0
br_feature_checkpoint
br_feature_infer_300iduo
br_feature_mcore
r1.6.0-beta1
br_infer_deepseek_ep
br_feature_rl_dpo
r1.3.0
r1.3.1
r1.4.0-beta2
r1.4.0-beta1
r1.5.0-beta1
r1.2.0
r1.1.0
r1.1.0-infer
r1.1.rc1
r1.0
kbk-infer
r1.0.a
r0.8
r0.7
r0.6.1_demo
r0.6
0.6rc1
r0.3
r0.2
v0.1.2
v0.1.1
v0.1.0
v1.7.0
v1.7.0-beta3
v1.7.0-beta2
v1.6.0
v1.6.0-beta1
v1.5.0
v1.5.0-beta2
v1.5.0-beta1
v1.4.0-beta2
v1.3.2
v1.3.1-beta1
v1.4.0-beta1
v1.3.0
v1.2.0
v1.1.0
v1.0.2
v1.0.1
v1.0.0
v0.6.0
v0.3
v0.2_rc
v0.1.1
v0.1.0
开始日期   -   截止日期
-
置顶选项
不置顶
置顶等级:高
置顶等级:中
置顶等级:低
优先级
不指定
严重
主要
次要
不重要
预计工期
(小时)
参与者(1)
Python
1
https://gitee.com/mindspore/mindformers.git
git@gitee.com:mindspore/mindformers.git
mindspore
mindformers
mindformers
点此查找更多帮助
搜索帮助
Git 命令在线学习
如何在 Gitee 导入 GitHub 仓库
Git 仓库基础操作
企业版和社区版功能对比
SSH 公钥设置
如何处理代码冲突
仓库体积过大,如何减小?
如何找回被删除的仓库数据
Gitee 产品配额说明
GitHub仓库快速导入Gitee及同步更新
什么是 Release(发行版)
将 PHP 项目自动发布到 packagist.org
评论
仓库举报
回到顶部
登录提示
该操作需登录 Gitee 帐号,请先登录后再操作。
立即登录
没有帐号,去注册