from typing import Optional, ClassVar, Any, Union, List, Tuple, Sequence
import dataclasses
from dataclasses import dataclass, field
import datetime
import struct
from oscpython.common import *
from oscpython.arguments import *
from oscpython.arguments import TimeTagArgument, StringArgument, BlobArgument
from oscpython.address import Address
__all__ = (
'ParseError', 'PacketStartError', 'MessageStartError', 'BundleStartError',
'Packet', 'Message', 'Bundle',
)
[docs]class ParseError(Exception):
DEFAULT_MSG: ClassVar[Optional[str]] = None
def __init__(self, packet_data: bytes, msg: Optional[str] = None):
self.packet_data = packet_data
if msg is None:
msg = self.DEFAULT_MSG
self.msg = msg
def __str__(self):
s = f'packet_data = "{self.packet_data!r}"'
if self.msg is not None:
s = f'{self.msg} ({s})'
return s
[docs]class PacketStartError(ParseError):
DEFAULT_MSG = 'Expected either "/" or "#" in start byte'
[docs]class MessageStartError(PacketStartError):
DEFAULT_MSG = 'Expected "/" in start byte'
[docs]class BundleStartError(PacketStartError):
DEFAULT_MSG = 'Expected "#bundle" in start bytes'
[docs]@dataclass
class Packet:
"""OSC packet (either :class:`Message` or :class:`Bundle`)
"""
remote_client: Optional[Client] = None
"""If the packet was received, the host that was received from. If sending
the packet, the destination host
"""
parent_bundle: Optional['Bundle'] = field(default=None, repr=False)
"""Instance of :class:`Bundle` that contains the packet (if any)
"""
parent_index: Optional[int] = None
"""Index of the packet within the :attr:`parent_bundle`
"""
[docs] @classmethod
def parse(cls, packet_data: bytes, **kwargs) -> Tuple['Packet', bytes]:
"""Parse OSC-formatted bytes and build a :class:`Message` or :class:`Bundle`
Arguments:
packet_data: The byte string to parse
**kwargs: Keyword arguments to pass when creating :class:`Packet`
instances
Returns a tuple of:
:class:`Packet`
The parsed object
:class:`bytes`
Any remaining bytes after the packet data
"""
if packet_data.startswith(b'/'):
return Message.parse(packet_data, **kwargs)
elif packet_data.startswith(b'#bundle\x00'):
return Bundle.parse(packet_data, **kwargs)
else:
raise MessageStartError(packet_data)
[docs]@dataclass
class Message(Packet):
"""An OSC Message
"""
address: Address = field(default_factory=Address)
"""The OSC address pattern for the message
"""
arguments: List[Argument] = field(default_factory=list)
"""OSC arguments for the message
"""
@property
def timetag(self) -> 'TimeTag':
""":class:`~.common.TimeTag` associated with the message
If present, the :attr:`~Bundle.timetag` of the :attr:`~Packet.parent_bundle`
is used. Otherwise, this will always be :attr:`.common.TimeTag.Immediately`
"""
p = self.parent_bundle
if p is not None:
return p.timetag
return TimeTag.Immediately
[docs] @classmethod
def create(cls, address: Union[Address, str], *args, **kwargs):
"""Convenience method to create a :class:`Message`
Creates the :attr:`address` field from the provided address string
and adds the message :attr:`arguments` contained in positional args
"""
if not isinstance(address, Address):
address = Address(pattern=address)
kwargs['address'] = address
msg = cls(**kwargs)
msg.add_arguments(*args)
return msg
[docs] def add_argument(self, value: Any) -> Argument:
"""Create an :class:`~.arguments.Argument` from the given value
and add it to the :attr:`arguments` list.
If the value is an instance of :class:`~.arguments.Argument` it will
be added without copying
"""
ix = len(self)
if isinstance(value, Argument):
arg = value
arg.index = ix
else:
arg_cls = Argument.get_argument_for_value(value)
arg = arg_cls(value=value, index=ix)
self.arguments.append(arg)
return arg
[docs] def add_arguments(self, *values):
"""Create multiple arguments using :meth:`add_argument`
"""
for value in values:
self.add_argument(value)
[docs] def build_packet(self) -> bytes:
"""Construct a byte string for the message and its arguments
"""
typetags = TypeTags()
pack_list = ArgumentList([self.address, typetags])
for arg in self:
pack_list.append(arg)
typetags.append(arg.tag)
return pack_list.pack()
[docs] @classmethod
def parse(cls, packet_data: bytes, **kwargs) -> Tuple['Message', bytes]:
if not packet_data.startswith(b'/'):
raise MessageStartError(packet_data)
address, packet_data = unpack_str_from_bytes(packet_data)
args = []
if packet_data.startswith(b','):
typetags, packet_data = TypeTags.parse(packet_data)
for tag in typetags:
arg_cls = ARGUMENTS_BY_TAG[tag]
arg, packet_data = arg_cls.parse(packet_data)
args.append(arg)
return (cls.create(address, *args, **kwargs), packet_data)
def __iter__(self):
yield from self.arguments
def __len__(self):
return len(self.arguments)
def __getitem__(self, key):
return self.arguments[key].value
[docs]@dataclass
class Bundle(Packet):
"""An OSC Bundle
"""
timetag: TimeTag = TimeTag.Immediately
"""The :class:`~.common.TimeTag` associated with the bundle
"""
packets: List[Packet] = field(default_factory=list)
"""List of :class:`Packet` instances to include in the bundle. Elements
may be either :class:`Message` or :class:`Bundle` instances
(nested bundles are allowed to occur)
"""
[docs] def add_packet(self, packet: Packet):
"""Add a :class:`Message` or :class:`Bundle` to the :attr:`packets` list
"""
ix = len(self)
packet.parent_index = ix
packet.parent_bundle = self
self.packets.append(packet)
[docs] def build_packet(self) -> bytes:
"""Construct a byte string for the bundle and the packets it contains
"""
pack_list = ArgumentList([
StringArgument(value='#bundle'),
TimeTagArgument(value=self.timetag),
])
for packet in self:
_packet_data = packet.build_packet()
pack_list.append(BlobArgument(_packet_data))
return pack_list.pack()
[docs] @classmethod
def parse(cls, packet_data: bytes, **kwargs) -> Tuple['Bundle', bytes]:
if not packet_data.startswith(b'#bundle\x00'):
raise BundleStartError(packet_data)
packet_data = packet_data[8:]
tt, packet_data = TimeTagArgument.parse(packet_data)
bun_kwargs = kwargs.copy()
bun_kwargs['timetag'] = tt.value
bun = cls(**bun_kwargs)
while len(packet_data):
length = struct.unpack('>i', packet_data[:4])[0]
packet_data = packet_data[4:]
if packet_data[0:1] not in (b'/', b'#'):
raise PacketStartError(packet_data[:length])
packet, remaining = Packet.parse(packet_data[:length], **kwargs)
bun.add_packet(packet)
packet_data = packet_data[length:]
# assert remaining == packet_data
return (bun, packet_data)
def __iter__(self):
yield from self.packets
def __len__(self):
return len(self.packets)
def __getitem__(self, key):
return self.packets[key]