import param
from .collections import new_collection
from .datasets import stage_for_download, download_datasets, open_dataset, get_datasets, add_datasets
from .catalog import search_catalog
from .metadata import get_metadata
from .tools import run_tool
from ..database import get_db, db_session
from ..util import logger as log
from ..static import DatasetStatus
[docs]def get_data(
service_uri,
search_filters=None,
search_queries=None,
download_options=None,
collection_name='default',
expand=False,
use_cache=True,
max_catalog_entries=10,
as_open_datasets=True,
raise_on_error=False,
):
"""
Downloads data from source uri and adds to a quest collection.
Args:
service_uri (string, required):
uri for service to get data from
search_filters (dict, optional, default=None):
dictionary of search filters to filter the catalog search. At least one of `search_filters` or
`search_queries` should be specified. (see docs for :func:`quest.api.search_catalog`)
search_queries (list, optional, default=None):
list of string arguments to pass to :obj:`pandas.DataFrame.query` to filter the catalog search. At least one
of `search_filters` or `search_queries` should be specified. (see docs for :func:`quest.api.search_catalog`)
download_options (dict or Parameterized, optional, default=None):
dictionary or Parameterized object with download options for service
(see docs for quest.api.download_datasets)
collection_name (string, optional, default='default'):
name of collection to add downloaded data to. If collection doesn't exist it will be created.
expand (bool, optional, default=False):
include dataset details and format as dict
use_cache (bool, optional, default=True):
if True then previously downloaded datasets with the same download options will be returned
rather than downloading new datasets
max_catalog_entries (int, optional, default=10):
the maximum number of datasets to allow in the search. If exceeded a Runtime error is raised.
as_open_datasets (bool, optional, default=False):
if True return datasets as Python data structures rather than as dataset ids
(see docs for quest.api.open_dataset)
raise_on_error (bool, optional, default=False):
if True then raise an exception if no datasets are returned in the search,
or if there is an error while downloading any of the datasets.
Returns:
the quest dataset name, or an python data structure if open_dataset=True.
"""
new_collection(collection_name, exists_ok=True)
# download the features (i.e. locations) and metadata for the given web service
# the first time this is run it will take longer since it downloads
# and caches all metadata for the given service.
catalog_entries = search_catalog(
uris=service_uri,
filters=search_filters,
queries=search_queries,
)
if not catalog_entries:
log.warn('Search was empty. No datasets will be downloaded.')
if raise_on_error:
raise RuntimeError('Search was empty. No datasets will be downloaded.')
# Ensure number of features is reasonable before downloading
if len(catalog_entries) > max_catalog_entries:
raise RuntimeError('The number of service features found was {} which exceeds the `max_features` of {}.'
.format(len(catalog_entries), max_catalog_entries))
if isinstance(download_options, param.Parameterized):
download_options = dict(download_options.get_param_values())
datasets = list()
cached_datasets = list()
if use_cache:
matched = _get_cached_data(catalog_entries, download_options, collection_name)
catalog_entries = set(catalog_entries) - set(matched.keys())
cached_datasets = list(matched.values())
# add the selected features to a quest collection
if catalog_entries:
datasets = add_datasets(collection_name, list(catalog_entries))
stage_for_download(uris=datasets, options=download_options)
# download the staged datasets
for dataset in datasets:
try:
download_datasets(datasets=dataset, raise_on_error=True)
except Exception as e:
log.exception('The following error was raised while downloading dataset {}'.format(dataset), exc_info=e)
if raise_on_error:
raise e
datasets.extend(cached_datasets)
if as_open_datasets:
datasets = [open_dataset(dataset) for dataset in datasets]
elif expand:
datasets = list(get_datasets(expand=True, queries=[f'name in {datasets}']).values())
return datasets
[docs]def get_seamless_data(
service_uri,
bbox,
search_filters=None,
search_queries=None,
download_options=None,
collection_name='default',
expand=False,
use_cache=True,
max_catalog_entries=10,
as_open_dataset=True,
raise_on_error=False,
):
"""
Downloads raster data from source uri and adds to a quest collection.
If multiple raster tiles are retrieved for the given bounds it calls a quest
tool to merge the tiles into a single raster.
Args:
service_uri (string, required):
uri for service to get data from
bbox (list, required):
list of lat/lon coordinates representing the bounds of the data in for form
[lon_min, lat_min, lon_max, lat_max].
search_filters (dict, optional, default=None):
dictionary of search filters to filter the catalog search (see docs for :func:`quest.api.search_catalog`)
search_queries (list, optional, default=None):
list of string arguments to pass to :obj:`pandas.DataFrame.query` to filter the catalog search
(see docs for :func:`quest.api.search_catalog`)
download_options (dict or Parameterized, optional, default=None):
dictionary or Parameterized object with download options for service
(see docs for quest.api.download_datasets)
collection_name (string, optional, default='default'):
name of collection to add downloaded data to. If collection doesn't exist it will be created.
expand (bool, optional, default=False):
include dataset details and format as dict
use_cache (bool, optional, default=True):
if True then previously downloaded datasets with the same download options will be returned
rather than downloading new datasets
max_catalog_entries (int, optional, default=10):
the maximum number of datasets to allow in the search. If exceeded a Runtime error is raised.
as_open_dataset (bool, optional, default=False):
if True return dataset as Python data structure rather than as a dataset id
(see docs for quest.api.open_dataset)
raise_on_error (bool, optional, default=False):
if True then raise an exception if no datasets are returned in the search,
or if there is an error while downloading.
Returns:
the quest dataset name.
"""
if not _is_tile_service(service_uri):
raise ValueError('The service {} does not support seamless data. '
'Only raster tile services are supported.'.format(service_uri))
search_filters = search_filters or dict()
search_filters.update(bbox=bbox)
datasets = get_data(
service_uri=service_uri,
search_filters=search_filters,
search_queries=search_queries,
download_options=download_options,
collection_name=collection_name,
use_cache=use_cache,
max_catalog_entries=max_catalog_entries,
as_open_datasets=False,
raise_on_error=raise_on_error,
)
if not datasets:
# Don't try to merge an empty list of datasets
return None
tool_name = 'raster-merge'
tool_options = {'bbox': bbox, 'datasets': datasets}
merged_dataset = None
if use_cache:
merged_dataset = _get_cached_derived_data(tool_name, tool_options)
if as_open_dataset and merged_dataset is not None:
merged_dataset = [open_dataset(dataset) for dataset in merged_dataset] or None
if merged_dataset is None:
# merge and clip into a single raster tile
merged_dataset = run_tool(
name=tool_name,
options=tool_options,
as_open_datasets=as_open_dataset,
)['datasets']
dataset = merged_dataset[0]
if expand and not as_open_dataset:
dataset = get_metadata(dataset)[dataset]
return dataset
def _get_cached_data(catalog_entries, download_options, collection=None):
"""Returns datasets that have been successfully downloaded
where `catalog_entry` and `options` match `catalog_entries` and `download_options`.
Args:
catalog_entries (list, required):
list of catalog_entry ids to check for matching downloaded data
download_options (dict, required):
dictionary of download options to match against previously downloaded data
collection (string, optional, default=None):
Optionally only match cached data from within given collection.
Returns:
a dictionary of dataset ids mapped to provided catalog entries
"""
db = get_db()
with db_session:
# NOTE: d.options will not evaluate to None in the lambda expression. It therefore must be compared
# outside of it.
datasets = [d for d in db.Dataset.select(
lambda d: d.catalog_entry in catalog_entries and d.status == DatasetStatus.DOWNLOADED
) if d.options == download_options]
if collection is not None:
datasets = [d for d in datasets if d.collection.name == collection]
return {d.catalog_entry: d.name for d in datasets}
def _get_cached_derived_data(tool_name, tool_options):
"""Returns datasets that have been generated by Quest tools where `tool_applied` and `tool_options`
match the arguments `tool_name` and `tool_options`.
Args:
tool_name (string, required):
name of quest tool that was run to generate data
tool_options (dict, required):
dictionary of tool options used to generate data
Returns:
a list of dataset ids that have matached arguments or None
"""
options = {
'tool_applied': tool_name,
'tool_options': tool_options,
}
db = get_db()
with db_session:
datasets = [d.name for d in db.Dataset.select() if d.options == options] or None
return datasets
def _is_tile_service(service_uri):
"""Checks that `service_uri` is a data service that provides tiled raster data.
Args:
service_uri (string, required):
string of a service to verify that it provides tiles
Returns:
True if service provides tiles, False otherwise
"""
# TODO check to make sure the service provides raster tiles that can be merged.
return True