# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
======================
ISLA Astroquery Module
======================
European Space Astronomy Centre (ESAC)
European Space Agency (ESA)
"""
from astropy.table import Table
from astroquery import log
from astroquery.utils import commons
from requests import HTTPError
from . import conf
import astroquery.esa.utils.utils as esautils
from astroquery.esa.utils import EsaTap
from datetime import datetime
__all__ = ['Integral', 'IntegralClass']
[docs]
class IntegralClass(EsaTap):
"""
This module connects with ESA Integral TAP
"""
ESA_ARCHIVE_NAME = "ISLA"
TAP_URL = conf.ISLA_TAP_SERVER
LOGIN_URL = conf.ISLA_LOGIN_SERVER
LOGOUT_URL = conf.ISLA_LOGOUT_SERVER
def __init__(self, auth_session=None, tap_url=None):
super().__init__(auth_session=auth_session, tap_url=tap_url)
self.instruments = []
self.bands = []
self.instrument_band_map = {}
[docs]
def get_sources(self, target_name, *, async_job=False, output_file=None, output_format=None):
"""Retrieve the coordinates of an INTEGRAL source
Parameters
----------
target_name : str, mandatory
target name to be requested, mandatory
async_job : bool, optional, default 'False'
executes the query (job) in asynchronous/synchronous mode (default
synchronous)
output_file : str, optional, default None
file name where the results are saved if dumpToFile is True.
If this parameter is not provided, the jobid is used instead
output_format : str, optional, default 'votable'
results format
Returns
-------
An astropy.table object containing the results
"""
# First attempt, resolve the name in the source catalogue
query = conf.ISLA_TARGET_CONDITION.format(target_name)
result = self.query_tap(query=query, async_job=async_job, output_file=output_file, output_format=output_format)
if len(result) > 0:
return result
# Second attempt, resolve using a Resolver Service and cone search to the source catalogue
try:
coordinates = esautils.resolve_target(conf.ISLA_TARGET_RESOLVER, self.tap._session, target_name, 'ALL')
if coordinates:
query = conf.ISLA_CONE_TARGET_CONDITION.format(coordinates.ra.degree, coordinates.dec.degree, 0.0833)
result = self.query_tap(query=query, async_job=async_job, output_file=output_file,
output_format=output_format)
if len(result) > 0:
return result[0]
raise ValueError(f"Target {target_name} cannot be resolved for ISLA")
except ValueError:
raise ValueError(f"Target {target_name} cannot be resolved for ISLA")
[docs]
def get_observations(self, *, target_name=None, coordinates=None, radius=14.0, start_time=None, end_time=None,
start_revno=None, end_revno=None, async_job=False, output_file=None, output_format=None,
verbose=False):
"""Retrieve the INTEGRAL observations associated to target name, time range and/or revolution
Parameters
----------
target_name: str, optional
target name to be requested
coordinates: str or SkyCoord, optional
coordinates of the center in the cone search
radius: float or quantity, optional, default value 14 degrees
radius in degrees (int, float) or quantity of the cone_search
start_time: str in UTC or datetime, optional
start time of the observation
end_time: str in UTC or datetime, optional
end time of the observation
start_revno: string, optional
start revolution number, as a four-digit string with leading zeros
e.g. 0352
end_revno: string, optional
end revolution number, as a four-digit string with leading zeros
e.g. 0353
async_job : bool, optional, default 'False'
executes the query (job) in asynchronous/synchronous mode (default
synchronous)
output_file : str, optional, default None
file name where the results are saved if dumpToFile is True.
If this parameter is not provided, the jobid is used instead
output_format : str, optional, default 'votable'
results format
verbose : bool, optional, default 'False'
flag to display information about the process
Returns
-------
An astropy.table object containing the results
"""
base_query = conf.ISLA_OBSERVATION_BASE_QUERY
query = base_query
conditions = []
# Target name/Coordinates + radius condition
if target_name and coordinates:
raise TypeError("Please use only target or coordinates as "
"parameter.")
# Radius in degrees
if radius is not None:
radius = esautils.get_degree_radius(radius)
# Resolve target or coordinates to get coordinates
if target_name:
coord = self.get_sources(target_name=target_name)
ra = coord['ra'][0]
dec = coord['dec'][0]
conditions.append(conf.ISLA_COORDINATE_CONDITION.format(ra, dec, radius))
elif coordinates:
coord = commons.parse_coordinates(coordinates=coordinates)
ra = coord.ra.degree
dec = coord.dec.degree
conditions.append(conf.ISLA_COORDINATE_CONDITION.format(ra, dec, radius))
# Start/End time conditions
if start_time:
parsed_start = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
conditions.append(f"endtime >= '{parsed_start}'")
if end_time:
parsed_end = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
conditions.append(f"starttime <= '{parsed_end}'")
# Revolution Number conditions
if start_revno and self.__validate_revno(start_revno):
conditions.append(f"end_revno >= '{start_revno}'")
if end_revno and self.__validate_revno(end_revno):
conditions.append(f"start_revno <= '{end_revno}'")
# Create final query
if conditions:
query = f"{query} where {' AND '.join(conditions)}"
query = f"{query} order by obsid"
if verbose:
return query
else:
return self.query_tap(query=query, async_job=async_job, output_file=output_file,
output_format=output_format)
[docs]
def download_science_windows(self, *, science_windows=None, observation_id=None, revolution=None, proposal=None,
output_file=None, cache=False, read_fits=True):
"""Method to download science windows associated to one of these parameters:
science_windows, observation_id, revolution or proposal
Parameters
----------
science_windows : list of str, optional
Science Windows to download
observation_id: str, optional
Observation ID associated to science windows
revolution: str, optional
Revolution associated to science windows
proposal: str, optional
Proposal ID associated to science windows
output_file: str, optional
File name and path for the downloaded file
cache: bool, optional, default False
Flag to determine if the file is stored in the cache or not
read_fits: bool, optional, default True
Open the downloaded file and parse the existing FITS files
Returns
-------
If read_fits=True, a list with objects containing filename, path and FITS file opened with the
science windows. If read_fits=False, the path of the downloaded file
"""
# Validate and retrieve the correct value
params = self.__get_science_window_parameter(science_windows, observation_id, revolution, proposal)
params['RETRIEVAL_TYPE'] = 'SCW'
try:
downloaded_file = esautils.download_file(url=conf.ISLA_DATA_SERVER, session=self.tap._session,
filename=output_file, params=params,
cache=cache, cache_folder=self.cache_location, verbose=True)
if read_fits:
return esautils.read_downloaded_fits([downloaded_file])
else:
return downloaded_file
except Exception as e:
log.error('No science windows have been found with these inputs. {}'.format(e))
[docs]
def get_timeline(self, coordinates, *, radius=14):
"""Retrieve the INTEGRAL timeline associated to coordinates and radius
Parameters
----------
coordinates: str or SkyCoord, mandatory
RA and Dec of the source
radius: float or quantity, optional, default value 14 degrees
radius in degrees (int, float) or quantity of the cone_search
Returns
-------
An object containing:
totalItems: a counter for the number of items retrieved
fraFC:
totEffExpo:
timeline: An astropy.table object containing the results for scwExpo, scwRevs, scwTimes and scwOffAxis
"""
if radius is not None:
radius = esautils.get_degree_radius(radius)
c = commons.parse_coordinates(coordinates=coordinates)
query_params = {
'REQUEST': 'timelines',
"ra": c.ra.degree,
"dec": c.dec.degree,
"radius": radius
}
try:
# Execute the request to the servlet
request_result = esautils.execute_servlet_request(url=conf.ISLA_SERVLET,
tap=self.tap,
query_params=query_params)
total_items = request_result['totalItems']
data = request_result['data']
fraFC = data['fraFC']
totEffExpo = data['totEffExpo']
timeline = Table({
"scwExpo": data["scwExpo"],
"scwRevs": data["scwRevs"],
"scwTimes": [datetime.fromtimestamp(scwTime / 1000) for scwTime in data["scwTimes"]],
"scwOffAxis": data["scwOffAxis"]
})
return {'total_items': total_items, 'fraFC': fraFC, 'totEffExpo': totEffExpo, 'timeline': timeline}
except HTTPError as e:
if 'None science windows have been selected' in e.response.text:
raise ValueError('No timeline is available for the current coordinates and radius.')
else:
raise e
[docs]
def get_epochs(self, *, target_name=None, instrument=None, band=None):
"""Retrieve the INTEGRAL epochs associated to a target and an instrument or a band
Parameters
----------
target_name : str, optional
target name to be requested, mandatory
instrument : str, optional
Possible values are in isla.instruments object
band : str, optional
Possible values are in isla.bandsobject
Returns
-------
An astropy.table object containing the available epochs
"""
value = self.__get_instrument_or_band(instrument=instrument, band=band)
instrument_oid, band_oid = self.__get_oids(value)
if target_name:
query = conf.ISLA_EPOCH_TARGET_QUERY.format(target_name, instrument_oid, band_oid)
else:
query = conf.ISLA_EPOCH_QUERY.format(instrument_oid, band_oid)
return self.query_tap(query)
[docs]
def get_long_term_timeseries(self, target_name, *, instrument=None, band=None, path='', filename=None,
cache=False, read_fits=True):
"""Method to download long term timeseries associated to an epoch and instrument or band
Parameters
----------
target_name : str, mandatory
target name to be requested, mandatory
instrument : str
Possible values are in isla.instruments object
band : str
Possible values are in isla.bandsobject
path: str, optional
Path for the downloaded file
filename: str, optional
Filename for the downloaded file
cache: bool, optional, default False
Flag to determine if the file is stored in the cache or not
read_fits: bool, optional, default True
Open the downloaded file and parse the existing FITS files
Returns
-------
If read_fits=True, a list with objects containing filename, path and FITS file opened with long
term timeseries. If read_fits=False, the path of the downloaded file
"""
value = self.__get_instrument_or_band(instrument=instrument, band=band)
params = {'RETRIEVAL_TYPE': 'long_timeseries',
'source': target_name,
'instrument_oid': self.instrument_band_map[value]['instrument_oid']}
try:
downloaded_file = esautils.download_file(url=conf.ISLA_DATA_SERVER, session=self.tap._session,
params=params, path=path, filename=filename,
cache=cache, cache_folder=self.cache_location, verbose=True)
if read_fits:
return esautils.read_downloaded_fits([downloaded_file])
else:
return downloaded_file
except HTTPError as err:
log.error('No long term timeseries have been found with these inputs. {}'.format(err))
except Exception as e:
log.error('Problem when retrieving long term timeseries. {}'.format(e))
[docs]
def get_short_term_timeseries(self, target_name, epoch, instrument=None, band=None,
path='', filename=None, cache=False, read_fits=True):
"""Method to download short term timeseries associated to an epoch and instrument or band
Parameters
----------
target_name : str, mandatory
target name to be requested, mandatory
epoch : str, mandatory
reference epoch for the short term timeseries
instrument : str, optional
Possible values are in isla.instruments object
band : str, optional
Possible values are in isla.bandsobject
path: str, optional
Path for the downloaded file
filename: str, optional
Filename for the downloaded file
cache: bool, optional, default False
Flag to determine if the file is stored in the cache or not
read_fits: bool, optional, default True
Open the downloaded file and parse the existing FITS files
Returns
-------
If read_fits=True, a list with objects containing filename, path and FITS file opened with short
term timeseries. If read_fits=False, the path of the downloaded file
"""
value = self.__get_instrument_or_band(instrument=instrument, band=band)
self.__validate_epoch(target_name=target_name, epoch=epoch,
instrument=instrument, band=band)
params = {'RETRIEVAL_TYPE': 'short_timeseries',
'source': target_name,
'band_oid': self.instrument_band_map[value]['band_oid'],
'epoch': epoch}
try:
downloaded_file = esautils.download_file(url=conf.ISLA_DATA_SERVER, session=self.tap._session,
params=params, path=path, filename=filename,
cache=cache, cache_folder=self.cache_location, verbose=True)
if read_fits:
return esautils.read_downloaded_fits([downloaded_file])
else:
return downloaded_file
except HTTPError as err:
log.error('No short term timeseries have been found with these inputs. {}'.format(err))
except Exception as e:
log.error('Problem when retrieving short term timeseries. {}'.format(e))
[docs]
def get_spectra(self, target_name, epoch, instrument=None, band=None, *, path='', filename=None,
cache=False, read_fits=True):
"""Method to download mosaics associated to an epoch and instrument or band
Parameters
----------
target_name : str, mandatory
target name to be requested, mandatory
epoch : str, mandatory
reference epoch for the short term timeseries
instrument : str
Possible values are in isla.instruments object
band : str
Possible values are in isla.bandsobject
path: str, optional
Path for the downloaded file
filename: str, optional
Filename for the downloaded file
cache: bool, optional, default False
Flag to determine if the file is stored in the cache or not
read_fits: bool, optional, default True
Open the downloaded file and parse the existing FITS files
Returns
-------
If read_fits=True, a list with objects containing filename, path and FITS file opened with spectra.
If read_fits=False, a list of paths of the downloaded files
"""
value = self.__get_instrument_or_band(instrument=instrument, band=band)
self.__validate_epoch(target_name=target_name, epoch=epoch,
instrument=instrument, band=band)
query_params = {
'REQUEST': 'spectra',
"source": target_name,
"instrument_oid": self.instrument_band_map[value]['instrument_oid'],
"epoch": epoch
}
try:
# Execute the request to the servlet
request_result = esautils.execute_servlet_request(url=conf.ISLA_SERVLET,
tap=self.tap,
query_params=query_params)
if len(request_result) == 0:
raise ValueError('Please try with different input parameters.')
# Parse the spectrum
downloaded_files = []
for element in request_result:
params = {'RETRIEVAL_TYPE': 'spectras',
'spectra_oid': element['spectraOid']}
downloaded_files.append(
esautils.download_file(url=conf.ISLA_DATA_SERVER, session=self.tap._session,
params=params, path=path, filename=filename,
cache=cache, cache_folder=self.cache_location, verbose=True))
if read_fits:
return esautils.read_downloaded_fits(downloaded_files)
else:
return downloaded_files
except ValueError as err:
log.error('Spectra are not available with these inputs. {}'.format(err))
except Exception as e:
log.error('Problem when retrieving spectra. {}'.format(e))
[docs]
def get_mosaic(self, epoch, instrument=None, band=None, *, path='', filename=None, cache=False, read_fits=True):
"""Method to download mosaics associated to an epoch and instrument or band
Parameters
----------
epoch : str, mandatory
reference epoch for the short term timeseries
instrument : str
Possible values are in isla.instruments object
band : str
Possible values are in isla.bandsobject
cache: bool, optional, default False
Flag to determine if the file is stored in the cache or not
path: str, optional
Path for the downloaded file
filename: str, optional
Filename for the downloaded file
read_fits: bool, optional, default True
Open the downloaded file and parse the existing FITS files
Returns
-------
If read_fits=True, a list with objects containing filename, path and FITS file opened with mosaics.
If read_fits=False, a list of paths of the downloaded files
"""
self.__validate_epoch(epoch=epoch,
instrument=instrument, band=band)
value = self.__get_instrument_or_band(instrument=instrument, band=band)
query_params = {
'REQUEST': 'mosaics',
"band_oid": self.instrument_band_map[value]['band_oid'],
"epoch": epoch
}
try:
# Execute the request to the servlet
request_result = esautils.execute_servlet_request(url=conf.ISLA_SERVLET,
tap=self.tap,
query_params=query_params)
if len(request_result) == 0:
raise ValueError('Please try with different input parameters.')
downloaded_files = []
for element in request_result:
params = {'RETRIEVAL_TYPE': 'mosaics',
'mosaic_oid': element['mosaicOid']}
downloaded_files.append(
esautils.download_file(url=conf.ISLA_DATA_SERVER, session=self.tap._session,
params=params, path=path, filename=filename,
cache=cache, cache_folder=self.cache_location, verbose=True))
if read_fits:
return esautils.read_downloaded_fits(downloaded_files)
else:
return downloaded_files
except ValueError as err:
log.error('Mosaics are not available for these inputs. {}'.format(err))
except Exception as e:
log.error('Problem when retrieving mosaics. {}'.format(e))
[docs]
def get_instrument_band_map(self):
"""
Maps the bands and instruments included in ISLA
Returns
-------
An object containing the instruments and bands
"""
if len(self.instrument_band_map) == 0:
instrument_band_table = self.query_tap(conf.ISLA_INSTRUMENT_BAND_QUERY)
instrument_band_map = {}
for row in instrument_band_table:
instrument_band_map[row['instrument']] = {'band': row['band'],
'instrument_oid': row['instrument_oid'],
'band_oid': row['band_oid']}
instrument_band_map[row['band']] = {'instrument': row['instrument'],
'instrument_oid': row['instrument_oid'],
'band_oid': row['band_oid']}
instruments = instrument_band_table['instrument']
bands = instrument_band_table['band']
self.instruments = instruments
self.bands = bands
self.instrument_band_map = instrument_band_map
return self.instrument_band_map
[docs]
def get_instruments(self):
"""
Get the instruments available in ISLA
"""
self.get_instrument_band_map()
return self.instruments
[docs]
def get_bands(self):
"""
Get the bands available in ISLA
"""
self.get_instrument_band_map()
return self.bands
def __get_instrument_or_band(self, instrument, band):
if instrument and band:
raise TypeError("Please use only instrument or band as "
"parameter.")
if instrument is None and band is None:
raise TypeError("Please use at least one parameter, instrument or band.")
if instrument:
value = instrument
else:
value = band
# Retrieve the available instruments or bands if not loaded yet
self.get_instrument_band_map()
# Validate the value is in the list of allowed ones
if value in self.instrument_band_map:
return value
raise ValueError(f"This is not a valid value for instrument or band. Valid values are:\n"
f"Instruments: {self.get_instruments()}\n"
f"Bands: {self.get_bands()}")
def __get_oids(self, value):
"""
Retrieves the band_oid and instrument_oid associated to a band or instrument
Parameters
----------
value: str
value to check
"""
return self.instrument_band_map[value]['instrument_oid'], self.instrument_band_map[value]['band_oid']
def __validate_revno(self, rev_no):
"""
Verifies if the format for revolution number is correct
Parameters
----------
rev_no: str
revolution number
"""
if len(rev_no) == 4:
return True
raise ValueError(f"Revolution number {rev_no} is not correct. It must be a four-digit number as a string, "
f"with leading zeros to complete the four digits")
def __validate_epoch(self, epoch, *, target_name=None, instrument=None, band=None):
"""
Validate if the epoch is available for the target name and instrument or band
Parameters
----------
epoch : str, mandatory
reference epoch for the short term timeseries
target_name : str, optional
target name to be requested, mandatory
instrument : str, optional
Possible values are in isla.instruments object
band : str, optional
Possible values are in isla.bandsobject
"""
available_epochs = self.get_epochs(target_name=target_name, instrument=instrument, band=band)
if epoch not in available_epochs['epoch']:
raise ValueError(f"Epoch {epoch} is not available for this target and instrument/band.")
def __get_science_window_parameter(self, science_windows, observation_id, revolution, proposal):
"""
Verifies if only one parameter is not null and return its value
Parameters
----------
science_windows : list of str or str, mandatory
Science Windows to download
observation_id: str, optional
Observation ID associated to science windows
revolution: str, optional
Revolution associated to science windows
proposal: str, optional
Proposal ID associated to science windows
Returns
-------
The correct parameter for the science windows
"""
params = [science_windows, observation_id, revolution, proposal]
# Count how many are not None
non_none_count = sum(p is not None for p in params)
# Ensure only one parameter is provided
if non_none_count > 1:
raise ValueError("Only one parameter can be provided at a time.")
if science_windows is not None:
if isinstance(science_windows, str):
return {'scwid': science_windows}
elif isinstance(science_windows, list):
return {'scwid': ','.join(science_windows)}
if observation_id is not None and isinstance(observation_id, str):
return {'obsid': observation_id}
if revolution is not None and isinstance(revolution, str):
return {'REVID': revolution}
if proposal is not None and isinstance(proposal, str):
return {'PROPID': proposal}
raise ValueError("Input parameters are wrong")
Integral = IntegralClass()