Module redvox.cloud.subscription
A simple WebSocket API for subscribing to live RedVox data.
Expand source code
"""
A simple WebSocket API for subscribing to live RedVox data.
"""
from dataclasses import dataclass
import logging
import threading
import time
from typing import Optional, List, Iterator, TypeVar, Generic
from queue import Empty, Queue
import lz4.frame # type: ignore
from dataclasses_json import dataclass_json
from websocket import WebSocketApp # type: ignore
from redvox.api1000.proto.redvox_api_m_pb2 import RedvoxPacketM
from redvox.api1000.wrapped_redvox_packet.wrapped_packet import WrappedRedvoxPacketM
from redvox.cloud.client import CloudClient
logger: logging.Logger = logging.getLogger(__name__)
@dataclass_json
@dataclass
class PubHeader:
"""
A header that is optionally included with messages from subscription producers.
"""
file_path: str
T = TypeVar("T", bytes, RedvoxPacketM, WrappedRedvoxPacketM)
R = TypeVar("R", RedvoxPacketM, WrappedRedvoxPacketM)
@dataclass
class PubMsg(Generic[T]):
"""
A message from a RedVox publisher.
"""
header: Optional[PubHeader]
msg: T
def map(self, msg: R) -> "PubMsg[R]":
"""
Converts the body of this message to another type while maintaining the header.
:param msg: The message to replace with.
:return: A PubMsg with an updated msg body.
"""
return PubMsg(self.header, msg)
@staticmethod
def parse(msg: bytes) -> "PubMsg":
"""
Parses the published message.
:param msg: The message to parse.
:return: An instance of a PubMsg.
"""
# Header is included
if msg[:3] == b"\xc0\xff\xee":
header_len: int = int.from_bytes(msg[3:5], "little", signed=False)
json: str = msg[5 : 5 + header_len].decode("utf-8")
pub_header: PubHeader = PubHeader.from_json(json) # type: ignore
return PubMsg(pub_header, msg[5 + header_len :])
# Header is not included
else:
return PubMsg(None, msg)
def fmt_uri(
base: str,
auth_token: str,
station_ids: Optional[List[str]] = None,
server_id: Optional[str] = None,
) -> str:
"""
Formats the subscription URL from the given base URL, authentication token, and optional station IDs.
:param base: The base URL.
:param auth_token: A current and valid authentication token.
:param station_ids: An optional list of station IDs to subscribe to.
:param server_id: An optional server ID for working with distributed acquisition servers.
:return: The formatted URI.
"""
station_ids_query: str
if station_ids is None or len(station_ids) == 0:
station_ids_query = ""
else:
station_ids_query = "".join(
map(lambda station_id: f"&station_id={station_id}", station_ids)
)
server_id_query: str
if server_id is None or len(server_id) == 0:
server_id_query = ""
else:
server_id_query = f"&sid={server_id}"
return f"{base}?auth_token={auth_token}{station_ids_query}&include_header=true{server_id_query}"
def subscribe_bytes_queue(
base_uri: str,
queue: Queue[PubMsg[bytes]],
client: CloudClient,
station_ids: Optional[List[str]] = None,
server_id: Optional[str] = None,
) -> None:
"""
Create a subscription on the raw compressed bytes.
:param base_uri: The base URI to the acquisition subscription service.
:param queue: A queue for transferring when received by the subscriber.
:param client: An instance of the RedVox CloudClient.
:param station_ids: An optional list of station IDs to subscribe to.
:param server_id: An optional server ID for working with distributed acquisition servers.
"""
while True:
uri: str = fmt_uri(base_uri, client.auth_token, station_ids, server_id)
logger.info(f"Connecting to {uri}")
# noinspection PyTypeChecker
ws_app: WebSocketApp = WebSocketApp(
uri,
on_message=lambda ws, msg: queue.put(PubMsg.parse(msg)),
on_open=lambda ws: logger.info(f"Connection established for {uri}"),
on_error=lambda ws, ex: logger.info(f"Connection error for {uri}: {ex}"),
on_close=lambda ws, code, reason: logger.info(
f"Connection closed for {uri}: code={code} reason={reason}"
),
)
ws_app.run_forever()
logger.info("Subscription stream ended. Attempting to reconnect...")
time.sleep(1)
# noinspection PyDefaultArgument
def subscribe_bytes(
base_uri: str,
client: CloudClient,
station_ids: Optional[List[str]] = None,
server_ids: Optional[List[str]] = ["0", "1"],
) -> Iterator[PubMsg[bytes]]:
"""
Create a subscription on the RedVox packet compressed bytes objects.
:param base_uri: The base URI to the acquisition subscription service.
:param client: An instance of the RedVox CloudClient.
:param station_ids: An optional list of station IDs to subscribe to.
:param server_ids: An optional list of server IDs for working with distributed acquisition servers.
:return: An iterator over RedVox compressed bytes instances.
"""
queue: Queue[PubMsg[bytes]] = Queue()
if server_ids is None:
subscription_thread: threading.Thread = threading.Thread(
target=subscribe_bytes_queue, args=(base_uri, queue, client, station_ids)
)
subscription_thread.start()
else:
server_id: str
for server_id in server_ids:
subscription_thread = threading.Thread(
target=subscribe_bytes_queue,
args=(base_uri, queue, client, station_ids, server_id),
)
subscription_thread.start()
while True:
try:
yield queue.get(True)
except Empty:
break
# noinspection PyDefaultArgument
def subscribe_proto(
base_uri: str,
client: CloudClient,
station_ids: Optional[List[str]] = None,
server_ids: Optional[List[str]] = ["0", "1"],
) -> Iterator[PubMsg[RedvoxPacketM]]:
"""
Create a subscription on the RedVox packet protobuf objects (RedvoxPacketM).
:param base_uri: The base URI to the acquisition subscription service.
:param client: An instance of the RedVox CloudClient.
:param station_ids: An optional list of station IDs to subscribe to.
:param server_ids: An optional list of server IDs for working with distributed acquisition servers.
:return: An iterator over RedvoxPacketM instances.
"""
pub_msg: PubMsg[bytes]
for pub_msg in subscribe_bytes(base_uri, client, station_ids, server_ids):
decompressed_bytes: bytes = lz4.frame.decompress(pub_msg.msg, False)
proto: RedvoxPacketM = RedvoxPacketM()
proto.ParseFromString(decompressed_bytes)
yield pub_msg.map(proto)
# noinspection PyDefaultArgument
def subscribe_packet(
base_uri: str,
client: CloudClient,
station_ids: Optional[List[str]] = None,
server_ids: Optional[List[str]] = ["0", "1"],
) -> Iterator[PubMsg[WrappedRedvoxPacketM]]:
"""
Create a subscription on the RedVox wrapped packet objects (WrappedRedvoxPacketM).
:param base_uri: The base URI to the acquisition subscription service.
:param client: An instance of the RedVox CloudClient.
:param station_ids: An optional list of station IDs to subscribe to.
:param server_ids: An optional list of server IDs for working with distributed acquisition servers.
:return: An iterator over WrappedRedvoxPacketM instances.
"""
pub_msg: PubMsg[RedvoxPacketM]
for pub_msg in subscribe_proto(base_uri, client, station_ids, server_ids):
yield pub_msg.map(WrappedRedvoxPacketM(pub_msg.msg))
Functions
def fmt_uri(base: str, auth_token: str, station_ids: Optional[List[str]] = None, server_id: Optional[str] = None) ‑> str
-
Formats the subscription URL from the given base URL, authentication token, and optional station IDs. :param base: The base URL. :param auth_token: A current and valid authentication token. :param station_ids: An optional list of station IDs to subscribe to. :param server_id: An optional server ID for working with distributed acquisition servers. :return: The formatted URI.
Expand source code
def fmt_uri( base: str, auth_token: str, station_ids: Optional[List[str]] = None, server_id: Optional[str] = None, ) -> str: """ Formats the subscription URL from the given base URL, authentication token, and optional station IDs. :param base: The base URL. :param auth_token: A current and valid authentication token. :param station_ids: An optional list of station IDs to subscribe to. :param server_id: An optional server ID for working with distributed acquisition servers. :return: The formatted URI. """ station_ids_query: str if station_ids is None or len(station_ids) == 0: station_ids_query = "" else: station_ids_query = "".join( map(lambda station_id: f"&station_id={station_id}", station_ids) ) server_id_query: str if server_id is None or len(server_id) == 0: server_id_query = "" else: server_id_query = f"&sid={server_id}" return f"{base}?auth_token={auth_token}{station_ids_query}&include_header=true{server_id_query}"
def subscribe_bytes(base_uri: str, client: CloudClient, station_ids: Optional[List[str]] = None, server_ids: Optional[List[str]] = ['0', '1']) ‑> Iterator[PubMsg[bytes]]
-
Create a subscription on the RedVox packet compressed bytes objects. :param base_uri: The base URI to the acquisition subscription service. :param client: An instance of the RedVox CloudClient. :param station_ids: An optional list of station IDs to subscribe to. :param server_ids: An optional list of server IDs for working with distributed acquisition servers. :return: An iterator over RedVox compressed bytes instances.
Expand source code
def subscribe_bytes( base_uri: str, client: CloudClient, station_ids: Optional[List[str]] = None, server_ids: Optional[List[str]] = ["0", "1"], ) -> Iterator[PubMsg[bytes]]: """ Create a subscription on the RedVox packet compressed bytes objects. :param base_uri: The base URI to the acquisition subscription service. :param client: An instance of the RedVox CloudClient. :param station_ids: An optional list of station IDs to subscribe to. :param server_ids: An optional list of server IDs for working with distributed acquisition servers. :return: An iterator over RedVox compressed bytes instances. """ queue: Queue[PubMsg[bytes]] = Queue() if server_ids is None: subscription_thread: threading.Thread = threading.Thread( target=subscribe_bytes_queue, args=(base_uri, queue, client, station_ids) ) subscription_thread.start() else: server_id: str for server_id in server_ids: subscription_thread = threading.Thread( target=subscribe_bytes_queue, args=(base_uri, queue, client, station_ids, server_id), ) subscription_thread.start() while True: try: yield queue.get(True) except Empty: break
def subscribe_bytes_queue(base_uri: str, queue: queue.Queue[PubMsg[bytes]], client: CloudClient, station_ids: Optional[List[str]] = None, server_id: Optional[str] = None) ‑> None
-
Create a subscription on the raw compressed bytes. :param base_uri: The base URI to the acquisition subscription service. :param queue: A queue for transferring when received by the subscriber. :param client: An instance of the RedVox CloudClient. :param station_ids: An optional list of station IDs to subscribe to. :param server_id: An optional server ID for working with distributed acquisition servers.
Expand source code
def subscribe_bytes_queue( base_uri: str, queue: Queue[PubMsg[bytes]], client: CloudClient, station_ids: Optional[List[str]] = None, server_id: Optional[str] = None, ) -> None: """ Create a subscription on the raw compressed bytes. :param base_uri: The base URI to the acquisition subscription service. :param queue: A queue for transferring when received by the subscriber. :param client: An instance of the RedVox CloudClient. :param station_ids: An optional list of station IDs to subscribe to. :param server_id: An optional server ID for working with distributed acquisition servers. """ while True: uri: str = fmt_uri(base_uri, client.auth_token, station_ids, server_id) logger.info(f"Connecting to {uri}") # noinspection PyTypeChecker ws_app: WebSocketApp = WebSocketApp( uri, on_message=lambda ws, msg: queue.put(PubMsg.parse(msg)), on_open=lambda ws: logger.info(f"Connection established for {uri}"), on_error=lambda ws, ex: logger.info(f"Connection error for {uri}: {ex}"), on_close=lambda ws, code, reason: logger.info( f"Connection closed for {uri}: code={code} reason={reason}" ), ) ws_app.run_forever() logger.info("Subscription stream ended. Attempting to reconnect...") time.sleep(1)
def subscribe_packet(base_uri: str, client: CloudClient, station_ids: Optional[List[str]] = None, server_ids: Optional[List[str]] = ['0', '1']) ‑> Iterator[PubMsg[WrappedRedvoxPacketM]]
-
Create a subscription on the RedVox wrapped packet objects (WrappedRedvoxPacketM). :param base_uri: The base URI to the acquisition subscription service. :param client: An instance of the RedVox CloudClient. :param station_ids: An optional list of station IDs to subscribe to. :param server_ids: An optional list of server IDs for working with distributed acquisition servers. :return: An iterator over WrappedRedvoxPacketM instances.
Expand source code
def subscribe_packet( base_uri: str, client: CloudClient, station_ids: Optional[List[str]] = None, server_ids: Optional[List[str]] = ["0", "1"], ) -> Iterator[PubMsg[WrappedRedvoxPacketM]]: """ Create a subscription on the RedVox wrapped packet objects (WrappedRedvoxPacketM). :param base_uri: The base URI to the acquisition subscription service. :param client: An instance of the RedVox CloudClient. :param station_ids: An optional list of station IDs to subscribe to. :param server_ids: An optional list of server IDs for working with distributed acquisition servers. :return: An iterator over WrappedRedvoxPacketM instances. """ pub_msg: PubMsg[RedvoxPacketM] for pub_msg in subscribe_proto(base_uri, client, station_ids, server_ids): yield pub_msg.map(WrappedRedvoxPacketM(pub_msg.msg))
def subscribe_proto(base_uri: str, client: CloudClient, station_ids: Optional[List[str]] = None, server_ids: Optional[List[str]] = ['0', '1']) ‑> Iterator[PubMsg[src.redvox_api_m.redvox_api_m_pb2.RedvoxPacketM]]
-
Create a subscription on the RedVox packet protobuf objects (RedvoxPacketM). :param base_uri: The base URI to the acquisition subscription service. :param client: An instance of the RedVox CloudClient. :param station_ids: An optional list of station IDs to subscribe to. :param server_ids: An optional list of server IDs for working with distributed acquisition servers. :return: An iterator over RedvoxPacketM instances.
Expand source code
def subscribe_proto( base_uri: str, client: CloudClient, station_ids: Optional[List[str]] = None, server_ids: Optional[List[str]] = ["0", "1"], ) -> Iterator[PubMsg[RedvoxPacketM]]: """ Create a subscription on the RedVox packet protobuf objects (RedvoxPacketM). :param base_uri: The base URI to the acquisition subscription service. :param client: An instance of the RedVox CloudClient. :param station_ids: An optional list of station IDs to subscribe to. :param server_ids: An optional list of server IDs for working with distributed acquisition servers. :return: An iterator over RedvoxPacketM instances. """ pub_msg: PubMsg[bytes] for pub_msg in subscribe_bytes(base_uri, client, station_ids, server_ids): decompressed_bytes: bytes = lz4.frame.decompress(pub_msg.msg, False) proto: RedvoxPacketM = RedvoxPacketM() proto.ParseFromString(decompressed_bytes) yield pub_msg.map(proto)
Classes
class PubHeader (file_path: str)
-
A header that is optionally included with messages from subscription producers.
Expand source code
@dataclass_json @dataclass class PubHeader: """ A header that is optionally included with messages from subscription producers. """ file_path: str
Class variables
var file_path : str
Static methods
def from_dict(kvs: Union[dict, list, str, int, float, bool, ForwardRef(None)], *, infer_missing=False) ‑> ~A
-
Expand source code
@classmethod def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: return _decode_dataclass(cls, kvs, infer_missing)
def from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) ‑> ~A
-
Expand source code
@classmethod def from_json(cls: Type[A], s: JsonData, *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) -> A: kvs = json.loads(s, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, **kw) return cls.from_dict(kvs, infer_missing=infer_missing)
def schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
-
Expand source code
@classmethod def schema(cls: Type[A], *, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) -> SchemaType: Schema = build_schema(cls, DataClassJsonMixin, infer_missing, partial) if unknown is None: undefined_parameter_action = _undefined_parameter_action_safe(cls) if undefined_parameter_action is not None: # We can just make use of the same-named mm keywords unknown = undefined_parameter_action.name.lower() return Schema(only=only, exclude=exclude, many=many, context=context, load_only=load_only, dump_only=dump_only, partial=partial, unknown=unknown)
Methods
def to_dict(self, encode_json=False) ‑> Dict[str, Union[dict, list, str, int, float, bool, ForwardRef(None)]]
-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json)
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Union[int, str, ForwardRef(None)] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) ‑> str
-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)
class PubMsg (header: Optional[PubHeader], msg: ~T)
-
A message from a RedVox publisher.
Expand source code
@dataclass class PubMsg(Generic[T]): """ A message from a RedVox publisher. """ header: Optional[PubHeader] msg: T def map(self, msg: R) -> "PubMsg[R]": """ Converts the body of this message to another type while maintaining the header. :param msg: The message to replace with. :return: A PubMsg with an updated msg body. """ return PubMsg(self.header, msg) @staticmethod def parse(msg: bytes) -> "PubMsg": """ Parses the published message. :param msg: The message to parse. :return: An instance of a PubMsg. """ # Header is included if msg[:3] == b"\xc0\xff\xee": header_len: int = int.from_bytes(msg[3:5], "little", signed=False) json: str = msg[5 : 5 + header_len].decode("utf-8") pub_header: PubHeader = PubHeader.from_json(json) # type: ignore return PubMsg(pub_header, msg[5 + header_len :]) # Header is not included else: return PubMsg(None, msg)
Ancestors
- typing.Generic
Class variables
var header : Optional[PubHeader]
var msg : ~T
Static methods
def parse(msg: bytes) ‑> PubMsg
-
Parses the published message. :param msg: The message to parse. :return: An instance of a PubMsg.
Expand source code
@staticmethod def parse(msg: bytes) -> "PubMsg": """ Parses the published message. :param msg: The message to parse. :return: An instance of a PubMsg. """ # Header is included if msg[:3] == b"\xc0\xff\xee": header_len: int = int.from_bytes(msg[3:5], "little", signed=False) json: str = msg[5 : 5 + header_len].decode("utf-8") pub_header: PubHeader = PubHeader.from_json(json) # type: ignore return PubMsg(pub_header, msg[5 + header_len :]) # Header is not included else: return PubMsg(None, msg)
Methods
def map(self, msg: ~R) ‑> PubMsg[~R]
-
Converts the body of this message to another type while maintaining the header. :param msg: The message to replace with. :return: A PubMsg with an updated msg body.
Expand source code
def map(self, msg: R) -> "PubMsg[R]": """ Converts the body of this message to another type while maintaining the header. :param msg: The message to replace with. :return: A PubMsg with an updated msg body. """ return PubMsg(self.header, msg)
class RedvoxPacketM (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
var DoubleSamplePayload
-
A ProtocolMessage
var EventStream
-
A ProtocolMessage
var MetadataEntry
-
A ProtocolMessage
var SamplePayload
-
A ProtocolMessage
var Sensors
-
A ProtocolMessage
var StationInformation
-
A ProtocolMessage
var SummaryStatistics
-
A ProtocolMessage
var TimingInformation
-
A ProtocolMessage
var TimingPayload
-
A ProtocolMessage
var Unit