Source code for quest.api.manage

import os
import shutil

from .tasks import add_async
from .projects import _get_project_dir
from .collections import get_collections
from .metadata import get_metadata, update_metadata
from ..static import UriType, DatasetSource
from ..util import logger, classify_uris, uuid, parse_service_uri
from ..database.database import get_db, db_session, select_datasets


[docs]@add_async def delete(uris): """Delete metadata for resource(s) WARNING: deleting a collection deletes all associated datasets Args: uris (string, comma separated string or list of strings, Required): uri(s) of collection, and/or dataset to delete Returns: status (bool): True on success """ # if uri list is empty do nothing if not uris: return True # group uris by type grouped_uris = classify_uris(uris, as_dataframe=False, exclude=[UriType.SERVICE, UriType.PUBLISHER], require_same_type=True) resource = list(grouped_uris)[0] uris = grouped_uris[resource] db = get_db() for uri in uris: if resource == UriType.COLLECTION: if uri not in get_collections(): logger.error('Collection does not exist: %s', uri) raise ValueError('Collection does not exists') with db_session: datasets = db.Dataset.select(lambda d: d.collection.name == uri) if datasets.count() > 0: datasets.delete() db.Collection[uri].delete() path = _get_project_dir() path = os.path.join(path, uri) if os.path.exists(path): logger.info('deleting all data under path: %s' % path) shutil.rmtree(path) if resource == UriType.DATASET: with db_session: dataset = db.Dataset[uri] if dataset.source == DatasetSource.DERIVED: catalog_entry_datasets = select_datasets(lambda d: d.catalog_entry == dataset.catalog_entry) if len(catalog_entry_datasets) == 1: _, _, catalog_id = parse_service_uri(dataset.catalog_entry) db.QuestCatalog[catalog_id].delete() try: os.remove(dataset.file_path) except (OSError, TypeError): pass dataset.delete() return True
[docs]@add_async def move(uris, destination_collection, as_dataframe=None, expand=None): if not uris: return {} grouped_uris = classify_uris(uris, as_dataframe=False, exclude=[UriType.SERVICE, UriType.PUBLISHER, UriType.COLLECTION], require_same_type=True) resource = list(grouped_uris)[0] uris = grouped_uris[resource] project_path = _get_project_dir() destination_collection_path = os.path.join(project_path, destination_collection) new_datasets = [] for uri in uris: if resource == UriType.DATASET: dataset_metadata = get_metadata(uri)[uri] collection_path = os.path.join(project_path, dataset_metadata['collection']) _move_dataset(dataset_metadata, collection_path, destination_collection_path) new_datasets.append(uri) if expand or as_dataframe: new_datasets = get_metadata(new_datasets, as_dataframe=as_dataframe) if as_dataframe: new_datasets['quest_type'] = 'dataset' return new_datasets return new_datasets
[docs]@add_async def copy(uris, destination_collection, as_dataframe=None, expand=None): if not uris: return {} grouped_uris = classify_uris(uris, as_dataframe=False, exclude=[UriType.SERVICE, UriType.PUBLISHER, UriType.COLLECTION], require_same_type=True) resource = list(grouped_uris)[0] uris = grouped_uris[resource] project_path = _get_project_dir() destination_collection_path = os.path.join(project_path, destination_collection) new_datasets = [] for uri in uris: if resource == UriType.DATASET: dataset_metadata = get_metadata(uri)[uri] collection_path = os.path.join(project_path, dataset_metadata['collection']) new_datasets.append(_copy_dataset(dataset_metadata, collection_path, destination_collection_path)) if expand or as_dataframe: new_datasets = get_metadata(new_datasets, as_dataframe=as_dataframe) if as_dataframe: new_datasets['quest_type'] = 'dataset' return new_datasets return new_datasets
def _copy_dataset(dataset_metadata, collection_path, destination_collection_path): new_name = uuid('dataset') db = get_db() with db_session: db_metadata = db.Dataset[dataset_metadata['name']].to_dict() db_metadata.update(name=new_name) db.Dataset(**db_metadata) _update_dataset_file_location(shutil.copy2, db_metadata, collection_path, destination_collection_path) return new_name def _move_dataset(dataset_metadata, collection_path, destination_collection_path): _update_dataset_file_location(shutil.move, dataset_metadata, collection_path, destination_collection_path) def _update_dataset_file_location(func, dataset_metadata, collection_path, destination_collection_path): quest_metadata = {} file_path = dataset_metadata['file_path'] if file_path is not None: rel_path = os.path.relpath(file_path, collection_path) new_file_path = os.path.normpath(os.path.join(destination_collection_path, rel_path)) quest_metadata['file_path'] = new_file_path os.makedirs(os.path.split(new_file_path)[0], exist_ok=True) func(file_path, new_file_path) update_metadata(dataset_metadata['name'], quest_metadata=quest_metadata)