Source code for epicsarchiver.retrieval.client.archiver_retrieval

"""Archiver Retrieval methods."""

from __future__ import annotations

import datetime
import logging
from typing import TYPE_CHECKING, cast

from pytz import UTC

from epicsarchiver.common.base_archiver import (
    DEFAULT_RETRIEVAL_PORT,
    BaseArchiverAppliance,
)
from epicsarchiver.common.date_util import (
    QueryTimestamp,
    ensure_utc,
)
from epicsarchiver.common.validation import (
    validate_processor,
    validate_pv,
    validate_start_end,
)
from epicsarchiver.retrieval.pb import parse_pb_data

if TYPE_CHECKING:
    import polars as pl
    from requests import Response

    from epicsarchiver.retrieval.archive_event import ArchiveEventsData
    from epicsarchiver.retrieval.client.processor import Processor


[docs] LOG: logging.Logger = logging.getLogger(__name__)
[docs] ENDPOINT_GET_DATA = "/data/getData.raw"
[docs] ENDPOINT_GET_MATCHING_PVS = "/bpl/getMatchingPVs"
[docs] class ArchiverRetrieval(BaseArchiverAppliance): """Retrieval EPICS Archiver Appliance client. Hold a session to the Retrieval Archiver Appliance web application. Args: hostname: EPICS Archiver Appliance hostname port: EPICS Archiver Appliance retrieval port Examples: .. code-block:: python from epicsarchiver.archiver.retrieval import ArchiverRetrieval archappl = ArchiverRetrieval("archiver-01.tn.esss.lu.se") print(archappl.version) df = archappl.get_data("my:pv", start="2018-07-04 13:00", end=datetime.utcnow()) """ def __init__(self, hostname: str = "localhost", port: int = DEFAULT_RETRIEVAL_PORT): """Create Archiver Appliance object. Args: hostname (str, optional): hostname of archiver. port (int, optional): port number of retrieval interface. """ super().__init__(hostname, port)
[docs] self._base_url = f"http://{self.hostname}:{self.port}/retrieval"
[docs] self.data_url: str = self._base_url + ENDPOINT_GET_DATA
[docs] self.matching_pvs_url: str = self._base_url + ENDPOINT_GET_MATCHING_PVS
[docs] def get_data_raw( self, pv: str, start: datetime.datetime, end: datetime.datetime, ) -> Response: """Retrieve archived data. Args: pv: name of the pv. start: start time. Can be a string or `datetime.datetime` object. end: end time. Can be a string or `datetime.datetime` object. Returns: `Response` """ # http://slacmshankar.github.io/epicsarchiver_docs/userguide.html params = { "pv": pv, "from": QueryTimestamp.from_datetime(start).to_query_string(), "to": QueryTimestamp.from_datetime(end).to_query_string(), } return self._get( self.data_url, params=params, )
[docs] def _get_matching_pvs( self, query: str, limit: int, ) -> list[str]: """Retrieve list of matching pv names for given regex search string. Args: query (str): A regex search string. limit (int): Limit of PV names to return. Returns: list[str]: List of pv names """ params = { "regex": query, "limit": str(limit), } return cast( "list[str]", self._get( self.matching_pvs_url, params=params, ).json(), )
[docs] def _check_for_pvs_in_time_range( self, query: list[str], start: datetime.datetime | None = None, end: datetime.datetime | None = None, ) -> list[str]: """Check if data recorded during the given time range for each PV in list. If both start and end given, return only PVs which recorded data during that time range. If start given and end not, return PVs which recorded data between start and now. If end given and start not, return PVs which recorded any data before end. Args: query (list[str]): List of pvs. start (datetime.datetime | None): Start of the time range. end (datetime.datetime | None): End of the time range. Returns: list[str]: List of PV names found. """ if not start and not end: return query # Add timezone if missing, otherwise convert to UTC. start = ensure_utc(start) if start else None end = ensure_utc(end) if end else datetime.datetime.now(tz=UTC) # Set both ends of time range in the data query query to end, then Archiver # returns the most recent event prior to end, or an empty result. all_events = [self.get_events(pv, end, end)[1] for pv in query] # Create list of those PVs with atleast one event within specified time range. pv_list: list[str] = [] for events in all_events: pv_list.extend( event.pv for event in events if (start and event.timestamp >= start) or not start ) return pv_list
[docs] def get_events( self, pv: str, start: datetime.datetime, end: datetime.datetime, processor: Processor | None = None, ) -> ArchiveEventsData: """Retrieve archived data. Args: pv: name of the pv. start: start time. Can be a string or `datetime.datetime` object. end: end time. Can be a string or `datetime.datetime` object. processor (Processor | None, optional): Preprocessor to use. Defaults to None. Returns: ArchiveEventsData: tuple of (metadata, events). """ # http://slacmshankar.github.io/epicsarchiver_docs/userguide.html validate_pv(pv) validate_start_end(start, end) validate_processor(processor) pv_request = processor.calc_pv_name(pv) if processor else pv r = self.get_data_raw(pv_request, start, end) pb_data = r.content data = parse_pb_data(pb_data) LOG.debug("Metadata: %s", data[0]) return data
[docs] def get_data( self, pv: str, start: str | datetime.datetime, end: str | datetime.datetime, processor: Processor | None = None, ) -> pl.DataFrame: """Retrieve archived data. Args: pv: name of the pv. start: start time. Can be a string or `datetime.datetime` object. end: end time. Can be a string or `datetime.datetime` object. processor (Processor | None, optional): Preprocessor to use. Defaults to None. Returns: `polars.DataFrame` Raises: ImportError: If the polars extra is not installed. """ try: from epicsarchiver.retrieval.dataframe import ( # noqa: PLC0415 dataframe_from_events, ) except ImportError as exc: msg = ( "polars extra required: " " pip install epicsarchiver-retrieval-client[polars]" ) raise ImportError(msg) from exc # http://slacmshankar.github.io/epicsarchiver_docs/userguide.html start_time = QueryTimestamp.from_input(start).datetime end_time = QueryTimestamp.from_input(end).datetime metadata, events = self.get_events(pv, start_time, end_time, processor) return dataframe_from_events(events, metadata)
[docs] def search( self, query: str, *, start: datetime.datetime | None = None, end: datetime.datetime | None = None, limit: int = 500, ) -> list[str]: """Search for names of PVs matching the given regex search string. Optionally specify start and/or end times to only return PVs that recorded data in the specified time range. Args: query (str): A regex search string. start (datetime.datetime | None): Start time of the time period. end (datetime.datetime | None): End time of the time period. limit (int): Limit of PV names to return for each search string given. To get all the PV names, (potentially in the millions), set limit to -1. [default: 500] Returns: list[str]: List of PV names found. """ # Limit returned list of PV to those in time range, if supplied. return self._check_for_pvs_in_time_range( query=self._get_matching_pvs(query, limit), start=start, end=end, )