# streamLearn **Repository Path**: WNJXYK/stream-learn ## Basic Information - **Project Name**: streamLearn - **Description**: No description available - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 30 - **Created**: 2024-06-09 - **Last Updated**: 2024-06-10 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # StreamLearn 流数据项目代码框架 ## StreamEstimator Stream learn算法主要需要实现以下接口,具体实现参考StreamLearn/Base/DeepModelMixin.py 注意以下几点 1. 每个算法单独为一个文件夹或者python文件,放在Algorithm文件夹下面(实现StreamEstimator)。 2. 每个数据集单独作为一个文件夹或者python文件,放在Dataset文件夹(实现StreamDataset)。 3. 每个算法需要提供一个测试入口,作为算法调用的样例。放在tests文件夹。 4. 提供对应的代码说明,可以参考下面的“流数据分布鲁棒学习算法” 5. 数据集中的数据如果不太大(1MB以下)可以将数据放在Dataset/data文件夹下,新建一个项目对应的子目录。如果数据集较大,应当在国内的数据平台(例如百度网盘)上传并在README中提供对应的链接和使用方法。 ```python class StreamEstimator(ABC, BaseEstimator): @abstractmethod def fit(self, stream_dataset): """ Train a stream model. :param stream_dataset: Instances of a stream dataset. """ raise NotImplementedError("The fit() method of StreamEstimator must be implemented.") @abstractmethod def predict(self, X): """ Predict y for input X. :param X: input. """ raise NotImplementedError("The predict() method of StreamEstimator must be implemented.") @abstractmethod def evaluate(self, y_pred, y_true): """ Evaluate stream algorithm on a stream dataset. :param y_pred: predict y. :param y_true: ground-truth y. """ raise NotImplementedError("The evaluate() method of StreamEstimator must be implemented.") ``` ## StreamDataset ```python class StreamDataset(Dataset): pass ``` --- ## 流数据分布鲁棒学习算法 针对多源异质流数据分布场景,建立吞吐自适流数据算法,实现对分布变化的鲁棒性。具体来说,针对多数据分布吞吐量相同和不同的两个场景,分别提出GDRO算法和加权GDRO算法,建立理论最优的样本复杂度,同时实验验证了两种方法的有效性。 相关算法工具开源至本项目仓库中,包含`多分布数据集构造`,`GDRO算法和加权GDRO算法实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Datast/AdultDataset.py - StreamLearn/Algorithm/GDRO/GDRO.py - StreamLearn/Algorithm/GDRO/WGDRO.py - StreamLearn/tests/test_GDRO.py 首先,按照以下方式构造流式数据集adult_dataset,其中参数 args.dataset_mode='balance' 表明流数据吞吐量相同场景;args.dataset_mode='imbalance' 表明吞吐量不同场景。 ```python adult_dataset = AdultDataset(args) ``` AdultData源数据链接([Link][https://archive.ics.uci.edu/dataset/2/adult]),预处理后数据([Link][https://pan.baidu.com/s/1sQu_o9qiaXUTvHFEeswZtA],提取码:jhxx) 其次,分别调用GDRO算法和加权GDRO算法,在adult_dataset上进行训练 ```python # GDRO算法训练 GDRO1 = GDRO(args) GDRO1.fit(adult_dataset) ``` ```python # 加权GDRO算法训练 WGDRO1 = WGDRO(args) WGDRO1.fit(adult_dataset) ``` 最后,分别对两个模型进行性能测试 ```python # GDRO算法测试 for i in range(adult_dataset.get_m()): data = adult_dataset.get_testdata(i) predict = GDRO1.predict(data[:, :-1]) loss = GDRO1.evaluate(predict, data[:, -1]).mean() print('GDRO', i, loss) ``` ```python # 加权GDRO算法测试 for i in range(adult_dataset.get_m()): data = adult_dataset.get_testdata(i) predict = WGDRO1.predict(data[:, :-1]) loss = WGDRO1.evaluate(predict, data[:, -1]).mean() print('WGDRO', i, loss) ``` ## 流数据增量学习算法 该算法主要包含`增量学习数据集构造`,`MEMO算法实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Datast/CILDataset.py - StreamLearn/Algorithm/ClassIncrementalLearning/MEMO.py - StreamLearn/tests/test_CIL.py 首先,按照以下方式构造流式数据集CIFAR100。 ```python dataset = CIFAR100_CIL(root='/home/anony/DATASETS', download=True) ``` 其次,调用MEMO算法,在流数据上进行训练 ```python # MEMO算法训练 model = get_CIL_method(parser.parse_args().alg) model.fit(dataset) ``` 最后,分别对两个模型进行性能测试 ```python # MEMO算法测试 y = model.predict(X=dataset._test_data) print(model.evaluate(y, dataset._test_targets)) ``` ## 流数据分布自适应学习算法 该算法主要包含`分布偏移数据集构造`,`ODS算法实现`,`性能测试`三部分,相关代码参见目录: - StreamLearn/Datast/TTADataset.py - StreamLearn/Algorithm/TTA/ODS.py - StreamLearn/tests/test_ODS.py 首先,测试代码的配置文件为:StreamLearn/Config/ODS.py,用户可修改测试文件进行不同复合数据分布变化的测试。 数据集下载地址:[CIFAR10-C](https://zenodo.org/records/2535967),训练参数下载地址:[百度网盘](https://pan.baidu.com/s/1mxADnKpv73X-Tu1uR8fkXg),提取码: qaj2。 其次,按照以下方式构造流式包含复合数据分布变化(包含协变量分布和标记分布偏移)的 CIFAR10 数据集。 ```python import StreamLearn.Dataset.TTADataset as datasets dataset = datasets.CIFAR10CB( root=args.stream.dataset_dir, batch_size=args.stream.batch_size, severities=args.stream.severities, corruptions=args.stream.corruptions, bind_class=args.stream.bind_class, bind_ratio=args.stream.bind_ratio, seed=args.seed, ) ``` 其中,`root`为数据根目录、`batch_size`控制数据流批大小、`severities`与`corruptions`控制协变量分布偏移的程度与类型、`bind_class`与`bind_ratio`控制标记分布变化的类别与比例、`seed`为数据集生成随机种子。 继而,调用 ODS 算法,复用已训练完毕的模型在复合分布偏移的数据流中进行自适应学习。 ```python args.method.model = net estimator = ODS.ODS(args.method) ``` 其中,`net`保存了深度学习模型,`args.method`中保存了 ODS 算法所需的超参数。 最后,将数据流中的样本输入算法中进行预测。 ```python # ODS算法测试 pred = estimator.predict(X).detach().cpu() ``` 测试具体代码详见 StreamLearn/tests/test_ODS.py 文件,使用方法为: ```bash cd stream-learn python StreamLearn/tests/test_ODS.py --data PATH_TO_DATA --checkpoint PATH_TO_CHECKPOINT ```