Source code for astroquery.esa.integral.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
======================
ISLA Astroquery Module
======================

European Space Astronomy Centre (ESAC)
European Space Agency (ESA)

"""
from astropy.table import Table
from astroquery import log
from astroquery.utils import commons
from requests import HTTPError

from . import conf
import astroquery.esa.utils.utils as esautils
from astroquery.esa.utils import EsaTap
from datetime import datetime

__all__ = ['Integral', 'IntegralClass']


[docs] class IntegralClass(EsaTap): """ This module connects with ESA Integral TAP """ ESA_ARCHIVE_NAME = "ISLA" TAP_URL = conf.ISLA_TAP_SERVER LOGIN_URL = conf.ISLA_LOGIN_SERVER LOGOUT_URL = conf.ISLA_LOGOUT_SERVER def __init__(self, auth_session=None, tap_url=None): super().__init__(auth_session=auth_session, tap_url=tap_url) self.instruments = [] self.bands = [] self.instrument_band_map = {}
[docs] def get_sources(self, target_name, *, async_job=False, output_file=None, output_format=None): """Retrieve the coordinates of an INTEGRAL source Parameters ---------- target_name : str, mandatory target name to be requested, mandatory async_job : bool, optional, default 'False' executes the query (job) in asynchronous/synchronous mode (default synchronous) output_file : str, optional, default None file name where the results are saved if dumpToFile is True. If this parameter is not provided, the jobid is used instead output_format : str, optional, default 'votable' results format Returns ------- An astropy.table object containing the results """ # First attempt, resolve the name in the source catalogue query = conf.ISLA_TARGET_CONDITION.format(target_name) result = self.query_tap(query=query, async_job=async_job, output_file=output_file, output_format=output_format) if len(result) > 0: return result # Second attempt, resolve using a Resolver Service and cone search to the source catalogue try: coordinates = esautils.resolve_target(conf.ISLA_TARGET_RESOLVER, self.tap._session, target_name, 'ALL') if coordinates: query = conf.ISLA_CONE_TARGET_CONDITION.format(coordinates.ra.degree, coordinates.dec.degree, 0.0833) result = self.query_tap(query=query, async_job=async_job, output_file=output_file, output_format=output_format) if len(result) > 0: return result[0] raise ValueError(f"Target {target_name} cannot be resolved for ISLA") except ValueError: raise ValueError(f"Target {target_name} cannot be resolved for ISLA")
[docs] def get_observations(self, *, target_name=None, coordinates=None, radius=14.0, start_time=None, end_time=None, start_revno=None, end_revno=None, async_job=False, output_file=None, output_format=None, verbose=False): """Retrieve the INTEGRAL observations associated to target name, time range and/or revolution Parameters ---------- target_name: str, optional target name to be requested coordinates: str or SkyCoord, optional coordinates of the center in the cone search radius: float or quantity, optional, default value 14 degrees radius in degrees (int, float) or quantity of the cone_search start_time: str in UTC or datetime, optional start time of the observation end_time: str in UTC or datetime, optional end time of the observation start_revno: string, optional start revolution number, as a four-digit string with leading zeros e.g. 0352 end_revno: string, optional end revolution number, as a four-digit string with leading zeros e.g. 0353 async_job : bool, optional, default 'False' executes the query (job) in asynchronous/synchronous mode (default synchronous) output_file : str, optional, default None file name where the results are saved if dumpToFile is True. If this parameter is not provided, the jobid is used instead output_format : str, optional, default 'votable' results format verbose : bool, optional, default 'False' flag to display information about the process Returns ------- An astropy.table object containing the results """ base_query = conf.ISLA_OBSERVATION_BASE_QUERY query = base_query conditions = [] # Target name/Coordinates + radius condition if target_name and coordinates: raise TypeError("Please use only target or coordinates as " "parameter.") # Radius in degrees if radius is not None: radius = esautils.get_degree_radius(radius) # Resolve target or coordinates to get coordinates if target_name: coord = self.get_sources(target_name=target_name) ra = coord['ra'][0] dec = coord['dec'][0] conditions.append(conf.ISLA_COORDINATE_CONDITION.format(ra, dec, radius)) elif coordinates: coord = commons.parse_coordinates(coordinates=coordinates) ra = coord.ra.degree dec = coord.dec.degree conditions.append(conf.ISLA_COORDINATE_CONDITION.format(ra, dec, radius)) # Start/End time conditions if start_time: parsed_start = datetime.fromisoformat(start_time.replace('Z', '+00:00')) conditions.append(f"endtime >= '{parsed_start}'") if end_time: parsed_end = datetime.fromisoformat(end_time.replace('Z', '+00:00')) conditions.append(f"starttime <= '{parsed_end}'") # Revolution Number conditions if start_revno and self.__validate_revno(start_revno): conditions.append(f"end_revno >= '{start_revno}'") if end_revno and self.__validate_revno(end_revno): conditions.append(f"start_revno <= '{end_revno}'") # Create final query if conditions: query = f"{query} where {' AND '.join(conditions)}" query = f"{query} order by obsid" if verbose: return query else: return self.query_tap(query=query, async_job=async_job, output_file=output_file, output_format=output_format)
[docs] def download_science_windows(self, *, science_windows=None, observation_id=None, revolution=None, proposal=None, output_file=None, cache=False, read_fits=True): """Method to download science windows associated to one of these parameters: science_windows, observation_id, revolution or proposal Parameters ---------- science_windows : list of str, optional Science Windows to download observation_id: str, optional Observation ID associated to science windows revolution: str, optional Revolution associated to science windows proposal: str, optional Proposal ID associated to science windows output_file: str, optional File name and path for the downloaded file cache: bool, optional, default False Flag to determine if the file is stored in the cache or not read_fits: bool, optional, default True Open the downloaded file and parse the existing FITS files Returns ------- If read_fits=True, a list with objects containing filename, path and FITS file opened with the science windows. If read_fits=False, the path of the downloaded file """ # Validate and retrieve the correct value params = self.__get_science_window_parameter(science_windows, observation_id, revolution, proposal) params['RETRIEVAL_TYPE'] = 'SCW' try: downloaded_file = esautils.download_file(url=conf.ISLA_DATA_SERVER, session=self.tap._session, filename=output_file, params=params, cache=cache, cache_folder=self.cache_location, verbose=True) if read_fits: return esautils.read_downloaded_fits([downloaded_file]) else: return downloaded_file except Exception as e: log.error('No science windows have been found with these inputs. {}'.format(e))
[docs] def get_timeline(self, coordinates, *, radius=14): """Retrieve the INTEGRAL timeline associated to coordinates and radius Parameters ---------- coordinates: str or SkyCoord, mandatory RA and Dec of the source radius: float or quantity, optional, default value 14 degrees radius in degrees (int, float) or quantity of the cone_search Returns ------- An object containing: totalItems: a counter for the number of items retrieved fraFC: totEffExpo: timeline: An astropy.table object containing the results for scwExpo, scwRevs, scwTimes and scwOffAxis """ if radius is not None: radius = esautils.get_degree_radius(radius) c = commons.parse_coordinates(coordinates=coordinates) query_params = { 'REQUEST': 'timelines', "ra": c.ra.degree, "dec": c.dec.degree, "radius": radius } try: # Execute the request to the servlet request_result = esautils.execute_servlet_request(url=conf.ISLA_SERVLET, tap=self.tap, query_params=query_params) total_items = request_result['totalItems'] data = request_result['data'] fraFC = data['fraFC'] totEffExpo = data['totEffExpo'] timeline = Table({ "scwExpo": data["scwExpo"], "scwRevs": data["scwRevs"], "scwTimes": [datetime.fromtimestamp(scwTime / 1000) for scwTime in data["scwTimes"]], "scwOffAxis": data["scwOffAxis"] }) return {'total_items': total_items, 'fraFC': fraFC, 'totEffExpo': totEffExpo, 'timeline': timeline} except HTTPError as e: if 'None science windows have been selected' in e.response.text: raise ValueError('No timeline is available for the current coordinates and radius.') else: raise e
[docs] def get_epochs(self, *, target_name=None, instrument=None, band=None): """Retrieve the INTEGRAL epochs associated to a target and an instrument or a band Parameters ---------- target_name : str, optional target name to be requested, mandatory instrument : str, optional Possible values are in isla.instruments object band : str, optional Possible values are in isla.bandsobject Returns ------- An astropy.table object containing the available epochs """ value = self.__get_instrument_or_band(instrument=instrument, band=band) instrument_oid, band_oid = self.__get_oids(value) if target_name: query = conf.ISLA_EPOCH_TARGET_QUERY.format(target_name, instrument_oid, band_oid) else: query = conf.ISLA_EPOCH_QUERY.format(instrument_oid, band_oid) return self.query_tap(query)
[docs] def get_long_term_timeseries(self, target_name, *, instrument=None, band=None, path='', filename=None, cache=False, read_fits=True): """Method to download long term timeseries associated to an epoch and instrument or band Parameters ---------- target_name : str, mandatory target name to be requested, mandatory instrument : str Possible values are in isla.instruments object band : str Possible values are in isla.bandsobject path: str, optional Path for the downloaded file filename: str, optional Filename for the downloaded file cache: bool, optional, default False Flag to determine if the file is stored in the cache or not read_fits: bool, optional, default True Open the downloaded file and parse the existing FITS files Returns ------- If read_fits=True, a list with objects containing filename, path and FITS file opened with long term timeseries. If read_fits=False, the path of the downloaded file """ value = self.__get_instrument_or_band(instrument=instrument, band=band) params = {'RETRIEVAL_TYPE': 'long_timeseries', 'source': target_name, 'instrument_oid': self.instrument_band_map[value]['instrument_oid']} try: downloaded_file = esautils.download_file(url=conf.ISLA_DATA_SERVER, session=self.tap._session, params=params, path=path, filename=filename, cache=cache, cache_folder=self.cache_location, verbose=True) if read_fits: return esautils.read_downloaded_fits([downloaded_file]) else: return downloaded_file except HTTPError as err: log.error('No long term timeseries have been found with these inputs. {}'.format(err)) except Exception as e: log.error('Problem when retrieving long term timeseries. {}'.format(e))
[docs] def get_short_term_timeseries(self, target_name, epoch, instrument=None, band=None, path='', filename=None, cache=False, read_fits=True): """Method to download short term timeseries associated to an epoch and instrument or band Parameters ---------- target_name : str, mandatory target name to be requested, mandatory epoch : str, mandatory reference epoch for the short term timeseries instrument : str, optional Possible values are in isla.instruments object band : str, optional Possible values are in isla.bandsobject path: str, optional Path for the downloaded file filename: str, optional Filename for the downloaded file cache: bool, optional, default False Flag to determine if the file is stored in the cache or not read_fits: bool, optional, default True Open the downloaded file and parse the existing FITS files Returns ------- If read_fits=True, a list with objects containing filename, path and FITS file opened with short term timeseries. If read_fits=False, the path of the downloaded file """ value = self.__get_instrument_or_band(instrument=instrument, band=band) self.__validate_epoch(target_name=target_name, epoch=epoch, instrument=instrument, band=band) params = {'RETRIEVAL_TYPE': 'short_timeseries', 'source': target_name, 'band_oid': self.instrument_band_map[value]['band_oid'], 'epoch': epoch} try: downloaded_file = esautils.download_file(url=conf.ISLA_DATA_SERVER, session=self.tap._session, params=params, path=path, filename=filename, cache=cache, cache_folder=self.cache_location, verbose=True) if read_fits: return esautils.read_downloaded_fits([downloaded_file]) else: return downloaded_file except HTTPError as err: log.error('No short term timeseries have been found with these inputs. {}'.format(err)) except Exception as e: log.error('Problem when retrieving short term timeseries. {}'.format(e))
[docs] def get_spectra(self, target_name, epoch, instrument=None, band=None, *, path='', filename=None, cache=False, read_fits=True): """Method to download mosaics associated to an epoch and instrument or band Parameters ---------- target_name : str, mandatory target name to be requested, mandatory epoch : str, mandatory reference epoch for the short term timeseries instrument : str Possible values are in isla.instruments object band : str Possible values are in isla.bandsobject path: str, optional Path for the downloaded file filename: str, optional Filename for the downloaded file cache: bool, optional, default False Flag to determine if the file is stored in the cache or not read_fits: bool, optional, default True Open the downloaded file and parse the existing FITS files Returns ------- If read_fits=True, a list with objects containing filename, path and FITS file opened with spectra. If read_fits=False, a list of paths of the downloaded files """ value = self.__get_instrument_or_band(instrument=instrument, band=band) self.__validate_epoch(target_name=target_name, epoch=epoch, instrument=instrument, band=band) query_params = { 'REQUEST': 'spectra', "source": target_name, "instrument_oid": self.instrument_band_map[value]['instrument_oid'], "epoch": epoch } try: # Execute the request to the servlet request_result = esautils.execute_servlet_request(url=conf.ISLA_SERVLET, tap=self.tap, query_params=query_params) if len(request_result) == 0: raise ValueError('Please try with different input parameters.') # Parse the spectrum downloaded_files = [] for element in request_result: params = {'RETRIEVAL_TYPE': 'spectras', 'spectra_oid': element['spectraOid']} downloaded_files.append( esautils.download_file(url=conf.ISLA_DATA_SERVER, session=self.tap._session, params=params, path=path, filename=filename, cache=cache, cache_folder=self.cache_location, verbose=True)) if read_fits: return esautils.read_downloaded_fits(downloaded_files) else: return downloaded_files except ValueError as err: log.error('Spectra are not available with these inputs. {}'.format(err)) except Exception as e: log.error('Problem when retrieving spectra. {}'.format(e))
[docs] def get_mosaic(self, epoch, instrument=None, band=None, *, path='', filename=None, cache=False, read_fits=True): """Method to download mosaics associated to an epoch and instrument or band Parameters ---------- epoch : str, mandatory reference epoch for the short term timeseries instrument : str Possible values are in isla.instruments object band : str Possible values are in isla.bandsobject cache: bool, optional, default False Flag to determine if the file is stored in the cache or not path: str, optional Path for the downloaded file filename: str, optional Filename for the downloaded file read_fits: bool, optional, default True Open the downloaded file and parse the existing FITS files Returns ------- If read_fits=True, a list with objects containing filename, path and FITS file opened with mosaics. If read_fits=False, a list of paths of the downloaded files """ self.__validate_epoch(epoch=epoch, instrument=instrument, band=band) value = self.__get_instrument_or_band(instrument=instrument, band=band) query_params = { 'REQUEST': 'mosaics', "band_oid": self.instrument_band_map[value]['band_oid'], "epoch": epoch } try: # Execute the request to the servlet request_result = esautils.execute_servlet_request(url=conf.ISLA_SERVLET, tap=self.tap, query_params=query_params) if len(request_result) == 0: raise ValueError('Please try with different input parameters.') downloaded_files = [] for element in request_result: params = {'RETRIEVAL_TYPE': 'mosaics', 'mosaic_oid': element['mosaicOid']} downloaded_files.append( esautils.download_file(url=conf.ISLA_DATA_SERVER, session=self.tap._session, params=params, path=path, filename=filename, cache=cache, cache_folder=self.cache_location, verbose=True)) if read_fits: return esautils.read_downloaded_fits(downloaded_files) else: return downloaded_files except ValueError as err: log.error('Mosaics are not available for these inputs. {}'.format(err)) except Exception as e: log.error('Problem when retrieving mosaics. {}'.format(e))
[docs] def get_source_metadata(self, target_name): """Retrieve the metadata associated to an INTEGRAL target Parameters ---------- target_name : str, mandatory target name to be requested, mandatory Returns ------- An object containing the metadata from the target """ query_params = { 'REQUEST': 'sources', "SOURCE": target_name } try: return esautils.execute_servlet_request(url=conf.ISLA_SERVLET, tap=self.tap, query_params=query_params) except HTTPError as e: if 'Source not found in the database' in e.response.text: raise ValueError(f"Target {target_name} cannot be resolved for ISLA") else: raise e
[docs] def get_instrument_band_map(self): """ Maps the bands and instruments included in ISLA Returns ------- An object containing the instruments and bands """ if len(self.instrument_band_map) == 0: instrument_band_table = self.query_tap(conf.ISLA_INSTRUMENT_BAND_QUERY) instrument_band_map = {} for row in instrument_band_table: instrument_band_map[row['instrument']] = {'band': row['band'], 'instrument_oid': row['instrument_oid'], 'band_oid': row['band_oid']} instrument_band_map[row['band']] = {'instrument': row['instrument'], 'instrument_oid': row['instrument_oid'], 'band_oid': row['band_oid']} instruments = instrument_band_table['instrument'] bands = instrument_band_table['band'] self.instruments = instruments self.bands = bands self.instrument_band_map = instrument_band_map return self.instrument_band_map
[docs] def get_instruments(self): """ Get the instruments available in ISLA """ self.get_instrument_band_map() return self.instruments
[docs] def get_bands(self): """ Get the bands available in ISLA """ self.get_instrument_band_map() return self.bands
def __get_instrument_or_band(self, instrument, band): if instrument and band: raise TypeError("Please use only instrument or band as " "parameter.") if instrument is None and band is None: raise TypeError("Please use at least one parameter, instrument or band.") if instrument: value = instrument else: value = band # Retrieve the available instruments or bands if not loaded yet self.get_instrument_band_map() # Validate the value is in the list of allowed ones if value in self.instrument_band_map: return value raise ValueError(f"This is not a valid value for instrument or band. Valid values are:\n" f"Instruments: {self.get_instruments()}\n" f"Bands: {self.get_bands()}") def __get_oids(self, value): """ Retrieves the band_oid and instrument_oid associated to a band or instrument Parameters ---------- value: str value to check """ return self.instrument_band_map[value]['instrument_oid'], self.instrument_band_map[value]['band_oid'] def __validate_revno(self, rev_no): """ Verifies if the format for revolution number is correct Parameters ---------- rev_no: str revolution number """ if len(rev_no) == 4: return True raise ValueError(f"Revolution number {rev_no} is not correct. It must be a four-digit number as a string, " f"with leading zeros to complete the four digits") def __validate_epoch(self, epoch, *, target_name=None, instrument=None, band=None): """ Validate if the epoch is available for the target name and instrument or band Parameters ---------- epoch : str, mandatory reference epoch for the short term timeseries target_name : str, optional target name to be requested, mandatory instrument : str, optional Possible values are in isla.instruments object band : str, optional Possible values are in isla.bandsobject """ available_epochs = self.get_epochs(target_name=target_name, instrument=instrument, band=band) if epoch not in available_epochs['epoch']: raise ValueError(f"Epoch {epoch} is not available for this target and instrument/band.") def __get_science_window_parameter(self, science_windows, observation_id, revolution, proposal): """ Verifies if only one parameter is not null and return its value Parameters ---------- science_windows : list of str or str, mandatory Science Windows to download observation_id: str, optional Observation ID associated to science windows revolution: str, optional Revolution associated to science windows proposal: str, optional Proposal ID associated to science windows Returns ------- The correct parameter for the science windows """ params = [science_windows, observation_id, revolution, proposal] # Count how many are not None non_none_count = sum(p is not None for p in params) # Ensure only one parameter is provided if non_none_count > 1: raise ValueError("Only one parameter can be provided at a time.") if science_windows is not None: if isinstance(science_windows, str): return {'scwid': science_windows} elif isinstance(science_windows, list): return {'scwid': ','.join(science_windows)} if observation_id is not None and isinstance(observation_id, str): return {'obsid': observation_id} if revolution is not None and isinstance(revolution, str): return {'REVID': revolution} if proposal is not None and isinstance(proposal, str): return {'PROPID': proposal} raise ValueError("Input parameters are wrong")
Integral = IntegralClass()