diff --git a/examples/lifelong_learning/RFNet/basemodel.py b/examples/lifelong_learning/RFNet/basemodel.py index dba4cfdf2..98583cef1 100644 --- a/examples/lifelong_learning/RFNet/basemodel.py +++ b/examples/lifelong_learning/RFNet/basemodel.py @@ -3,20 +3,20 @@ import torch from PIL import Image import argparse +from tqdm import tqdm + +from torchvision import transforms +from torch.utils.data import DataLoader +from sedna.common.config import Context +from sedna.common.file_ops import FileOps +from sedna.common.log import LOGGER + +from utils.metrics import Evaluator from train import Trainer from eval import Validator -from tqdm import tqdm from eval import load_my_state_dict -from utils.metrics import Evaluator from dataloaders import make_data_loader from dataloaders import custom_transforms as tr -from torchvision import transforms -from sedna.common.class_factory import ClassType, ClassFactory -from sedna.common.config import Context -from sedna.datasources import TxtDataParse -from torch.utils.data import DataLoader -from sedna.common.file_ops import FileOps -from utils.lr_scheduler import LR_Scheduler def preprocess(image_urls): transformed_images = [] @@ -34,7 +34,10 @@ def preprocess(image_urls): composed_transforms = transforms.Compose([ # tr.CropBlackArea(), # tr.FixedResize(size=self.args.crop_size), - tr.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)), + tr.Normalize( + mean=( + 0.485, 0.456, 0.406), std=( + 0.229, 0.224, 0.225)), tr.ToTensor()]) transformed_images.append((composed_transforms(sample), img_path)) @@ -52,33 +55,45 @@ def __init__(self, **kwargs): self.train_args.no_val = kwargs.get("no_val", True) # self.train_args.resume = Context.get_parameters("PRETRAINED_MODEL_URL", None) self.trainer = None - - label_save_dir = Context.get_parameters("INFERENCE_RESULT_DIR", "./inference_results") - self.val_args.color_label_save_path = os.path.join(label_save_dir, "color") - self.val_args.merge_label_save_path = os.path.join(label_save_dir, "merge") + self.train_model_url = None + + label_save_dir = Context.get_parameters( + "INFERENCE_RESULT_DIR", "./inference_results") + self.val_args.color_label_save_path = os.path.join( + label_save_dir, "color") + self.val_args.merge_label_save_path = os.path.join( + label_save_dir, "merge") self.val_args.label_save_path = os.path.join(label_save_dir, "label") + self.val_args.save_predicted_image = kwargs.get( + "save_predicted_image", "true").lower() self.validator = Validator(self.val_args) - def train(self, train_data, valid_data=None, **kwargs): + def train(self, train_data, valid_data=None, **kwargs): self.trainer = Trainer(self.train_args, train_data=train_data) print("Total epoches:", self.trainer.args.epochs) - for epoch in range(self.trainer.args.start_epoch, self.trainer.args.epochs): + for epoch in range( + self.trainer.args.start_epoch, + self.trainer.args.epochs): if epoch == 0 and self.trainer.val_loader: self.trainer.validation(epoch) self.trainer.training(epoch) - if self.trainer.args.no_val and \ - (epoch % self.trainer.args.eval_interval == (self.trainer.args.eval_interval - 1) - or epoch == self.trainer.args.epochs - 1): - # save checkpoint when it meets eval_interval or the training finished + if self.trainer.args.no_val and ( + epoch % + self.trainer.args.eval_interval == ( + self.trainer.args.eval_interval - + 1) or epoch == self.trainer.args.epochs - + 1): + # save checkpoint when it meets eval_interval or the training + # finished is_best = False - checkpoint_path = self.trainer.saver.save_checkpoint({ + self.train_model_url = self.trainer.saver.save_checkpoint({ 'epoch': epoch + 1, 'state_dict': self.trainer.model.state_dict(), 'optimizer': self.trainer.optimizer.state_dict(), 'best_pred': self.trainer.best_pred, }, is_best) - + # if not self.trainer.args.no_val and \ # epoch % self.train_args.eval_interval == (self.train_args.eval_interval - 1) \ # and self.trainer.val_loader: @@ -86,44 +101,67 @@ def train(self, train_data, valid_data=None, **kwargs): self.trainer.writer.close() - return checkpoint_path + return self.train_model_url def predict(self, data, **kwargs): if not isinstance(data[0][0], dict): data = preprocess(data) - if type(data) is np.ndarray: + if isinstance(data, np.ndarray): data = data.tolist() - self.validator.test_loader = DataLoader(data, batch_size=self.val_args.test_batch_size, shuffle=False, - pin_memory=True) + self.validator.test_loader = DataLoader( + data, + batch_size=self.val_args.test_batch_size, + shuffle=False, + pin_memory=True) return self.validator.validate() def evaluate(self, data, **kwargs): - self.val_args.save_predicted_image = kwargs.get("save_predicted_image", True) samples = preprocess(data.x) predictions = self.predict(samples) return accuracy(data.y, predictions) def load(self, model_url, **kwargs): if model_url: - self.validator.new_state_dict = torch.load(model_url, map_location=torch.device("cpu")) + self.validator.new_state_dict = torch.load( + model_url, map_location=torch.device("cpu")) + self.validator.model = load_my_state_dict( + self.validator.model, self.validator.new_state_dict['state_dict']) + self.train_args.resume = model_url else: raise Exception("model url does not exist.") - self.validator.model = load_my_state_dict(self.validator.model, self.validator.new_state_dict['state_dict']) def save(self, model_path=None): - # TODO: how to save unstructured data model - pass + # TODO: save unstructured data model + if not model_path: + LOGGER.warning(f"Not specify model path.") + return self.train_model_url + + return FileOps.upload(self.train_model_url, model_path) + def train_args(): parser = argparse.ArgumentParser(description="PyTorch RFNet Training") - parser.add_argument('--depth', action="store_true", default=False, - help='training with depth image or not (default: False)') - parser.add_argument('--dataset', type=str, default='cityscapes', - choices=['citylostfound', 'cityscapes', 'cityrand', 'target', 'xrlab', 'e1', 'mapillary'], - help='dataset name (default: cityscapes)') + parser.add_argument( + '--depth', + action="store_true", + default=False, + help='training with depth image or not (default: False)') + parser.add_argument( + '--dataset', + type=str, + default='cityscapes', + choices=[ + 'citylostfound', + 'cityscapes', + 'cityrand', + 'target', + 'xrlab', + 'e1', + 'mapillary'], + help='dataset name (default: cityscapes)') parser.add_argument('--workers', type=int, default=4, metavar='N', help='dataloader threads') parser.add_argument('--base-size', type=int, default=1024, @@ -149,8 +187,11 @@ def train_args(): parser.add_argument('--test-batch-size', type=int, default=1, metavar='N', help='input batch size for \ testing (default: auto)') - parser.add_argument('--use-balanced-weights', action='store_true', default=False, - help='whether to use balanced weights (default: True)') + parser.add_argument( + '--use-balanced-weights', + action='store_true', + default=False, + help='whether to use balanced weights (default: True)') parser.add_argument('--num-class', type=int, default=24, help='number of training classes (default: 24') # optimizer params @@ -164,8 +205,11 @@ def train_args(): parser.add_argument('--weight-decay', type=float, default=2.5e-5, metavar='M', help='w-decay (default: 5e-4)') # cuda, seed and logging - parser.add_argument('--no-cuda', action='store_true', default= - False, help='disables CUDA training') + parser.add_argument( + '--no-cuda', + action='store_true', + default=False, + help='disables CUDA training') parser.add_argument('--gpu-ids', type=str, default='0', help='use which gpu to train, must be a \ comma-separated list of integers only (default=0)') @@ -193,7 +237,8 @@ def train_args(): try: args.gpu_ids = [int(s) for s in args.gpu_ids.split(',')] except ValueError: - raise ValueError('Argument --gpu_ids must be a comma-separated list of integers only') + raise ValueError( + 'Argument --gpu_ids must be a comma-separated list of integers only') if args.epochs is None: epoches = { @@ -214,7 +259,8 @@ def train_args(): 'citylostfound': 0.0001, 'cityrand': 0.0001 } - args.lr = lrs[args.dataset.lower()] / (4 * len(args.gpu_ids)) * args.batch_size + args.lr = lrs[args.dataset.lower()] / \ + (4 * len(args.gpu_ids)) * args.batch_size if args.checkname is None: args.checkname = 'RFNet' @@ -223,11 +269,19 @@ def train_args(): return args + def val_args(): parser = argparse.ArgumentParser(description="PyTorch RFNet validation") - parser.add_argument('--dataset', type=str, default='cityscapes', - choices=['citylostfound', 'cityscapes', 'xrlab', 'mapillary'], - help='dataset name (default: cityscapes)') + parser.add_argument( + '--dataset', + type=str, + default='cityscapes', + choices=[ + 'citylostfound', + 'cityscapes', + 'xrlab', + 'mapillary'], + help='dataset name (default: cityscapes)') parser.add_argument('--workers', type=int, default=4, metavar='N', help='dataloader threads') parser.add_argument('--base-size', type=int, default=1024, @@ -243,17 +297,27 @@ def val_args(): metavar='N', help='input batch size for \ testing (default: auto)') parser.add_argument('--num-class', type=int, default=24, - help='number of training classes (default: 24') - parser.add_argument('--no-cuda', action='store_true', default=False, help='disables CUDA training') + help='number of training classes (default: 24)') + parser.add_argument( + '--no-cuda', + action='store_true', + default=False, + help='disables CUDA training') parser.add_argument('--gpu-ids', type=str, default='0', help='use which gpu to train, must be a \ comma-separated list of integers only (default=0)') parser.add_argument('--checkname', type=str, default=None, help='set the checkpoint name') - parser.add_argument('--weight-path', type=str, default="./models/530_exp3_2.pth", - help='enter your path of the weight') - parser.add_argument('--save-predicted-image', action='store_true', default=False, - help='save predicted images') + parser.add_argument( + '--weight-path', + type=str, + default="./models/530_exp3_2.pth", + help='enter your path of the weight') + parser.add_argument( + '--save-predicted-image', + action='store_true', + default=False, + help='save predicted images') parser.add_argument('--color-label-save-path', type=str, default='./test/color/', help='path to save label') @@ -262,8 +326,16 @@ def val_args(): help='path to save merged label') parser.add_argument('--label-save-path', type=str, default='./test/label/', help='path to save merged label') - parser.add_argument('--merge', action='store_true', default=True, help='merge image and label') - parser.add_argument('--depth', action='store_true', default=False, help='add depth image or not') + parser.add_argument( + '--merge', + action='store_true', + default=False, + help='merge image and label') + parser.add_argument( + '--depth', + action='store_true', + default=False, + help='add depth image or not') args = parser.parse_args() args.cuda = not args.no_cuda and torch.cuda.is_available() @@ -271,10 +343,12 @@ def val_args(): try: args.gpu_ids = [int(s) for s in args.gpu_ids.split(',')] except ValueError: - raise ValueError('Argument --gpu_ids must be a comma-separated list of integers only') + raise ValueError( + 'Argument --gpu_ids must be a comma-separated list of integers only') return args + def accuracy(y_true, y_pred, **kwargs): args = val_args() _, _, test_loader, num_class = make_data_loader(args, test_data=y_true) @@ -291,7 +365,7 @@ def accuracy(y_true, y_pred, **kwargs): if args.depth: depth = depth.cuda() - target[target > evaluator.num_class-1] = 255 + target[target > evaluator.num_class - 1] = 255 target = target.cpu().numpy() # Add batch sample into evaluator evaluator.add_batch(target, y_pred[i]) @@ -305,6 +379,7 @@ def accuracy(y_true, y_pred, **kwargs): print("CPA:{}, mIoU:{}, fwIoU: {}".format(CPA, mIoU, FWIoU)) return CPA + if __name__ == '__main__': model_path = "/tmp/RFNet/" if not os.path.exists(model_path): diff --git a/examples/lifelong_learning/RFNet/dataloaders/utils.py b/examples/lifelong_learning/RFNet/dataloaders/utils.py index ef572332a..514cbe607 100644 --- a/examples/lifelong_learning/RFNet/dataloaders/utils.py +++ b/examples/lifelong_learning/RFNet/dataloaders/utils.py @@ -25,7 +25,7 @@ def decode_segmap(label_mask, dataset, plot=False): n_classes = 21 label_colours = get_pascal_labels() elif dataset == 'cityscapes': - n_classes = 19 + n_classes = 24 label_colours = get_cityscapes_labels() elif dataset == 'target': n_classes = 24 diff --git a/examples/lifelong_learning/RFNet/eval.py b/examples/lifelong_learning/RFNet/eval.py index 482315c92..4963fd379 100644 --- a/examples/lifelong_learning/RFNet/eval.py +++ b/examples/lifelong_learning/RFNet/eval.py @@ -60,8 +60,7 @@ def validate(self): if self.args.depth: image, depth, target = sample['image'], sample['depth'], sample['label'] else: - # spec = time.time() - image, target = sample['image'], sample['label'] + image, target = sample['image'], sample['label'] if self.args.cuda: image = image.cuda() @@ -82,7 +81,7 @@ def validate(self): pred = np.argmax(pred, axis=1) predictions.append(pred) - if not self.args.save_predicted_image: + if self.args.save_predicted_image != "true": continue pre_colors = Colorize()(torch.max(output, 1)[1].detach().cpu().byte()) diff --git a/examples/lifelong_learning/RFNet/predict.py b/examples/lifelong_learning/RFNet/predict.py index 82b527a20..6fbc57608 100644 --- a/examples/lifelong_learning/RFNet/predict.py +++ b/examples/lifelong_learning/RFNet/predict.py @@ -1,30 +1,15 @@ -import os -os.environ['BACKEND_TYPE'] = 'PYTORCH' -# set at yaml -# os.environ["PREDICT_RESULT_DIR"] = "./inference_results" -# os.environ["EDGE_OUTPUT_URL"] = "./edge_kb" -# os.environ["video_url"] = "./video/radio.mp4" -# os.environ["MODEL_URLS"] = "./cloud_next_kb/index.pkl" - - import cv2 import time -import torch import numpy as np from PIL import Image -import base64 -import tempfile import warnings -from io import BytesIO from sedna.datasources import BaseDataSource from sedna.core.lifelong_learning import LifelongLearning from sedna.common.config import Context - -from dataloaders import custom_transforms as tr from torchvision import transforms -from accuracy import accuracy +from dataloaders import custom_transforms as tr from basemodel import preprocess, val_args, Model def preprocess(samples): @@ -42,7 +27,6 @@ def init_ll_job(): task_allocation = { "method": "TaskAllocationByOrigin", "param": { - "origins": ["real", "sim"], "default": "real" } } diff --git a/examples/lifelong_learning/RFNet/sedna_evaluate.py b/examples/lifelong_learning/RFNet/sedna_evaluate.py index 566333472..bfb46bc41 100644 --- a/examples/lifelong_learning/RFNet/sedna_evaluate.py +++ b/examples/lifelong_learning/RFNet/sedna_evaluate.py @@ -1,10 +1,4 @@ import os -os.environ['BACKEND_TYPE'] = 'PYTORCH' -# os.environ["KB_SERVER"] = "http://0.0.0.0:9020" -# os.environ["test_dataset_url"] = "./data_txt/sedna_data.txt" -# os.environ["MODEL_URLS"] = "./cloud_next_kb/index.pkl" -# os.environ["operator"] = "<" -# os.environ["model_threshold"] = "0" from sedna.core.lifelong_learning import LifelongLearning from sedna.datasources import IndexDataParse @@ -25,10 +19,7 @@ def eval(): eval_data.parse(eval_dataset_url, use_raw=False) task_allocation = { - "method": "TaskAllocationByOrigin", - "param": { - "origins": ["real", "sim"] - } + "method": "TaskAllocationByOrigin" } ll_job = LifelongLearning(estimator, diff --git a/examples/lifelong_learning/RFNet/sedna_predict.py b/examples/lifelong_learning/RFNet/sedna_predict.py index b32c01d2d..c919cc92a 100644 --- a/examples/lifelong_learning/RFNet/sedna_predict.py +++ b/examples/lifelong_learning/RFNet/sedna_predict.py @@ -1,13 +1,5 @@ import os -os.environ['BACKEND_TYPE'] = 'PYTORCH' -# os.environ["UNSEEN_SAVE_URL"] = "s3://kubeedge/sedna-robo/unseen_samples/" -# set at yaml -# os.environ["PREDICT_RESULT_DIR"] = "./inference_results" -os.environ["TEST_DATASET_URL"] = "./data_txt/door_test.txt" -os.environ["EDGE_OUTPUT_URL"] = "./edge_kb" -os.environ["ORIGINAL_DATASET_URL"] = "/tmp" - import torch import numpy as np from PIL import Image @@ -30,8 +22,7 @@ from dataloaders.datasets.cityscapes import CityscapesSegmentation def _load_txt_dataset(dataset_url): - # use original dataset url, - # see https://github.com/kubeedge/sedna/issues/35 + # use original dataset url original_dataset_url = Context.get_parameters('original_dataset_url') return os.path.join(os.path.dirname(original_dataset_url), dataset_url) diff --git a/examples/lifelong_learning/RFNet/sedna_train.py b/examples/lifelong_learning/RFNet/sedna_train.py index 1c99361aa..051f97d77 100644 --- a/examples/lifelong_learning/RFNet/sedna_train.py +++ b/examples/lifelong_learning/RFNet/sedna_train.py @@ -1,12 +1,5 @@ import os -os.environ['BACKEND_TYPE'] = 'PYTORCH' -os.environ["OUTPUT_URL"] = "./cloud_kb/" -# os.environ['CLOUD_KB_INDEX'] = "./cloud_kb/index.pkl" -os.environ["TRAIN_DATASET_URL"] = "./data_txt/sedna_data.txt" -os.environ["KB_SERVER"] = "http://0.0.0.0:9020" -os.environ["HAS_COMPLETED_INITIAL_TRAINING"] = "false" -from sedna.common.file_ops import FileOps from sedna.datasources import IndexDataParse from sedna.common.config import Context, BaseConfig from sedna.core.lifelong_learning import LifelongLearning @@ -20,17 +13,11 @@ def _load_txt_dataset(dataset_url): def train(estimator, train_data): task_definition = { - "method": "TaskDefinitionByOrigin", - "param": { - "origins": ["real", "sim"] - } + "method": "TaskDefinitionByOrigin" } task_allocation = { - "method": "TaskAllocationByOrigin", - "param": { - "origins": ["real", "sim"] - } + "method": "TaskAllocationByOrigin" } ll_job = LifelongLearning(estimator, @@ -65,7 +52,7 @@ def update(estimator, train_data): def run(): estimator = Model() train_dataset_url = BaseConfig.train_dataset_url - train_data = IndexDataParse(data_type="train") + train_data = IndexDataParse(data_type="train", func=_load_txt_dataset) train_data.parse(train_dataset_url, use_raw=False) is_completed_initilization = str(Context.get_parameters("HAS_COMPLETED_INITIAL_TRAINING", "false")).lower() diff --git a/examples/lifelong_learning/RFNet/train.py b/examples/lifelong_learning/RFNet/train.py index ca6c21949..6bdf85552 100644 --- a/examples/lifelong_learning/RFNet/train.py +++ b/examples/lifelong_learning/RFNet/train.py @@ -2,20 +2,21 @@ import os import numpy as np from tqdm import tqdm -import torch +from sedna.datasources import BaseDataSource + +import torch from mypath import Path -from dataloaders import make_data_loader from models.rfnet import RFNet from models.resnet.resnet_single_scale_single_attention import * -from utils.loss import SegmentationLosses from models.replicate import patch_replication_callback +from dataloaders import make_data_loader +from utils.loss import SegmentationLosses from utils.calculate_weights import calculate_weigths_labels from utils.lr_scheduler import LR_Scheduler from utils.saver import Saver from utils.summaries import TensorboardSummary from utils.metrics import Evaluator -from sedna.datasources import BaseDataSource class Trainer(object): def __init__(self, args, train_data=None, valid_data=None): diff --git a/lib/sedna/algorithms/knowledge_management/edge_knowledge_management.py b/lib/sedna/algorithms/knowledge_management/edge_knowledge_management.py index 120412c16..5dee7d5cf 100644 --- a/lib/sedna/algorithms/knowledge_management/edge_knowledge_management.py +++ b/lib/sedna/algorithms/knowledge_management/edge_knowledge_management.py @@ -40,7 +40,7 @@ def __init__(self, config, estimator, **kwargs): self.task_group_key = KBResourceConstant.TASK_GROUPS.value self.extractor_key = KBResourceConstant.EXTRACTOR.value - ModelLoadingThread(self.task_index).start() + ModelLoadingThread(self, self.task_index).start() def update_kb(self, task_index_url): if isinstance(task_index_url, str): @@ -131,11 +131,13 @@ def save_unseen_samples(self, samples, post_process): f"unseen_samples_{time.time()}.pkl") return FileOps.upload(name, unseen_save_url) + class ModelLoadingThread(threading.Thread): """Hot task index loading with multithread support""" MODEL_MANIPULATION_SEM = threading.Semaphore(1) def __init__(self, + edge_knowledge_management, task_index, callback=None ): @@ -148,12 +150,14 @@ def __init__(self, LOGGER.error("As local task index has not been loaded, skipped") self.run_flag = False model_check_time = int(Context.get_parameters( - "MODEL_POLL_PERIOD_SECONDS", "60") + "MODEL_POLL_PERIOD_SECONDS", "30") ) if model_check_time < 1: LOGGER.warning("Catch an abnormal value in " "`MODEL_POLL_PERIOD_SECONDS`, fallback with 60") - model_check_time = 60 + model_check_time = 30 + + self.edge_knowledge_management = edge_knowledge_management self.hot_update_task_index = hot_update_task_index self.check_time = model_check_time self.task_index = task_index @@ -170,9 +174,13 @@ def run(self): continue current_version = latest_version with self.MODEL_MANIPULATION_SEM: - LOGGER.info(f"Update model start with version {current_version}") + LOGGER.info( + f"Update model start with version {current_version}") try: FileOps.dump(tmp_task_index, self.task_index) + # TODO: update local kb with the latest index.pkl + self.edge_knowledge_management.update_kb(self.task_index) + status = K8sResourceKindStatus.COMPLETED.value LOGGER.info(f"Update task index complete " f"with version {self.version}") diff --git a/lib/sedna/algorithms/seen_task_learning/seen_task_learning.py b/lib/sedna/algorithms/seen_task_learning/seen_task_learning.py index d90f00cdc..cdf29a9e6 100644 --- a/lib/sedna/algorithms/seen_task_learning/seen_task_learning.py +++ b/lib/sedna/algorithms/seen_task_learning/seen_task_learning.py @@ -276,12 +276,14 @@ def _task_process( if callback: res = callback(model_obj, res) if isinstance(res, str): - model_path = res + model_path = model_obj.save(model_name=f"{task.entry}.pth") + model = Model(index=i, entry=task.entry, + model=model_path, result={}) else: model_path = model_obj.save( model_name=f"{task.entry}.model") - model = Model(index=i, entry=task.entry, - model=model_path, result=res) + model = Model(index=i, entry=task.entry, + model=model_path, result=res) model.meta_attr = [t.meta_attr for t in task.tasks] task.model = model @@ -388,18 +390,18 @@ def update(self, tasks, task_update_strategies, **kwargs): LOGGER.info(f"MTL Train start {i} : {task.entry}") for _task in task.tasks: model_obj = set_backend(estimator=self.base_model) - model_obj.load(_task.model, phase="train") + model_obj.load(_task.model) res = model_obj.train(train_data=task.samples) if isinstance(res, str): - model_path = res + model_path = model_obj.save( + model_name=f"{task.entry}.pth") model = Model(index=i, entry=task.entry, model=model_path, result={}) else: model_path = model_obj.save( - model_name=f"{task.entry}_{time.time()}.model") + model_name=f"{task.entry}.model") model = Model(index=i, entry=task.entry, model=model_path, result=res) - break model.meta_attr = [t.meta_attr for t in task.tasks] diff --git a/lib/sedna/algorithms/seen_task_learning/task_allocation/task_allocation_by_origin.py b/lib/sedna/algorithms/seen_task_learning/task_allocation/task_allocation_by_origin.py index 7b86ebfbb..eafac6e83 100644 --- a/lib/sedna/algorithms/seen_task_learning/task_allocation/task_allocation_by_origin.py +++ b/lib/sedna/algorithms/seen_task_learning/task_allocation/task_allocation_by_origin.py @@ -1,6 +1,7 @@ from sedna.datasources import BaseDataSource from sedna.common.class_factory import ClassFactory, ClassType + @ClassFactory.register(ClassType.STP) class TaskAllocationByOrigin: """ @@ -17,19 +18,46 @@ class TaskAllocationByOrigin: def __init__(self, task_extractor, **kwargs): self.task_extractor = task_extractor - self.origins = kwargs.get("origins", []) self.default_origin = kwargs.get("default", None) def __call__(self, samples: BaseDataSource): if self.default_origin: - return samples, [int(self.task_extractor.get(self.default_origin))] * len(samples.x) + return samples, [int(self.task_extractor.get( + self.default_origin))] * len(samples.x) + + cities = [ + "aachen", + "berlin", + "bochum", + "bremen", + "cologne", + "darmstadt", + "dusseldorf", + "erfurt", + "hamburg", + "hanover", + "jena", + "krefeld", + "monchengladbach", + "strasbourg", + "stuttgart", + "tubingen", + "ulm", + "weimar", + "zurich"] sample_origins = [] for _x in samples.x: - for origin in self.origins: - if origin in _x[0]: - sample_origins.append(origin) + is_real = False + for city in cities: + if city in _x[0]: + is_real = True + sample_origins.append("real") + break + if not is_real: + sample_origins.append("sim") - allocations = [int(self.task_extractor.get(sample_origin)) for sample_origin in sample_origins] + allocations = [int(self.task_extractor.get(sample_origin)) + for sample_origin in sample_origins] - return samples, allocations \ No newline at end of file + return samples, allocations diff --git a/lib/sedna/algorithms/seen_task_learning/task_definition/task_definition_by_origin.py b/lib/sedna/algorithms/seen_task_learning/task_definition/task_definition_by_origin.py index 460aff5f9..2f843ac03 100644 --- a/lib/sedna/algorithms/seen_task_learning/task_definition/task_definition_by_origin.py +++ b/lib/sedna/algorithms/seen_task_learning/task_definition/task_definition_by_origin.py @@ -5,6 +5,7 @@ from ..artifact import Task + @ClassFactory.register(ClassType.STP) class TaskDefinitionByOrigin: """ @@ -12,17 +13,38 @@ class TaskDefinitionByOrigin: Parameters ---------- - attribute: List[Metadata] + origins: List[Metadata] metadata is usually a class feature label with a finite values. """ def __init__(self, **kwargs): - self.origins = kwargs.get("origins", []) + self.origins = kwargs.get("origins", ["real", "sim"]) def __call__(self, samples: BaseDataSource, **kwargs) -> Tuple[List[Task], Any, BaseDataSource]: + cities = [ + "aachen", + "berlin", + "bochum", + "bremen", + "cologne", + "darmstadt", + "dusseldorf", + "erfurt", + "hamburg", + "hanover", + "jena", + "krefeld", + "monchengladbach", + "strasbourg", + "stuttgart", + "tubingen", + "ulm", + "weimar", + "zurich"] + tasks = [] d_type = samples.data_type x_data = samples.x @@ -30,20 +52,29 @@ def __call__(self, task_index = dict(zip(self.origins, range(len(self.origins)))) - for k, v in task_index.items(): - _x = [x for x in x_data if k in x[0]] - _y = [y for y in y_data if k in y] + real_df = BaseDataSource(data_type=d_type) + real_df.x, real_df.y = [], [] + sim_df = BaseDataSource(data_type=d_type) + sim_df.x, sim_df.y = [], [] - task_df = BaseDataSource(data_type=d_type) - task_df.x = _x - task_df.y = _y + for i in range(samples.num_examples()): + is_real = False + for city in cities: + if city in x_data[i][0]: + is_real = True + real_df.x.append(x_data[i]) + real_df.y.append(y_data[i]) + break + if not is_real: + sim_df.x.append(x_data[i]) + sim_df.y.append(y_data[i]) - g_attr = f"{k}_semantic_segamentation_model" - task_obj = Task(entry=g_attr, samples=task_df, meta_attr=k) - tasks.append(task_obj) + g_attr = "real_semantic_segamentation_model" + task_obj = Task(entry=g_attr, samples=real_df, meta_attr="real") + tasks.append(task_obj) - samples = BaseDataSource(data_type=d_type) - samples.x = x_data - samples.y = y_data + g_attr = "sim_semantic_segamentation_model" + task_obj = Task(entry=g_attr, samples=sim_df, meta_attr="sim") + tasks.append(task_obj) - return tasks, task_index, samples \ No newline at end of file + return tasks, task_index, samples diff --git a/lib/sedna/core/lifelong_learning/lifelong_learning.py b/lib/sedna/core/lifelong_learning/lifelong_learning.py index 1cf231d78..0e8984536 100644 --- a/lib/sedna/core/lifelong_learning/lifelong_learning.py +++ b/lib/sedna/core/lifelong_learning/lifelong_learning.py @@ -294,8 +294,10 @@ def update(self, train_data, valid_data=None, post_process=None, **kwargs): # seen_samples.y = np.concatenate( # (historical_data.y, seen_samples.y, unseen_samples.y), axis=0) - seen_samples.x = np.concatenate((seen_samples.x, unseen_samples.x), axis=0) - seen_samples.y = np.concatenate((seen_samples.y, unseen_samples.y), axis=0) + seen_samples.x = np.concatenate( + (seen_samples.x, unseen_samples.x), axis=0) + seen_samples.y = np.concatenate( + (seen_samples.y, unseen_samples.y), axis=0) task_update_decision = ClassFactory.get_cls( ClassType.KM, self.task_update_decision["method"])( @@ -412,7 +414,8 @@ def inference(self, data=None, post_process=None, **kwargs): self.edge_knowledge_management.task_index, **self.unseen_sample_recognition_param) - seen_samples, unseen_samples = unseen_sample_recognition(data, **kwargs) + seen_samples, unseen_samples = unseen_sample_recognition( + data, **kwargs) if unseen_samples.x is not None and len(unseen_samples.x) > 0: self.edge_knowledge_management.log.info( f"Unseen task is detected.") diff --git a/lib/sedna/datasources/__init__.py b/lib/sedna/datasources/__init__.py index cc0d2a36c..37927d5d1 100644 --- a/lib/sedna/datasources/__init__.py +++ b/lib/sedna/datasources/__init__.py @@ -169,8 +169,7 @@ def parse(self, *args, **kwargs): if self.process_func: res = [] for line in fin.readlines(): - lines = line.strip().split() - lines = [self.process_func(data) for data in lines] + lines = list(map(self.process_func, line.strip().split())) res.append(lines) else: res = [line.strip().split() for line in fin.readlines()] diff --git a/lib/sedna/service/server/knowledgeBase/server.py b/lib/sedna/service/server/knowledgeBase/server.py index 850ea0c2a..3f5994b00 100644 --- a/lib/sedna/service/server/knowledgeBase/server.py +++ b/lib/sedna/service/server/knowledgeBase/server.py @@ -28,6 +28,7 @@ from sedna.service.server.base import BaseServer from sedna.common.file_ops import FileOps from sedna.common.constant import KBResourceConstant +from sedna.common.config import Context from .model import * @@ -135,12 +136,7 @@ def update_status(self, data: KBUpdateResult = Body(...)): # todo: get from kb _index_path = FileOps.join_path(self.save_dir, self.kb_index) - try: - task_info = joblib.load(_index_path) - except Exception as err: - print(f"{err} And return None.") - return None - + task_info = FileOps.load(_index_path) new_task_group = [] # TODO: to fit seen tasks and unseen tasks @@ -153,6 +149,7 @@ def update_status(self, data: KBUpdateResult = Body(...)): new_task_group.append(task_group) task_info[self.seen_task_key][self.task_group_key] = new_task_group + _index_path = FileOps.join_path(self.save_dir, self.kb_index) FileOps.dump(task_info, _index_path) return f"/file/download?files={self.kb_index}&name={self.kb_index}"