Source code for oscpython.common

from typing import Optional, ClassVar, Any, Union, List, Tuple, Iterable, Sequence
import enum
import dataclasses
from dataclasses import dataclass, field
import datetime
import struct

NTP_EPOCH = datetime.datetime(1900, 1, 1)
UNIX_EPOCH = datetime.datetime.utcfromtimestamp(0)

EPOCH_DIFF = UNIX_EPOCH - NTP_EPOCH
EPOCH_DIFF_SECONDS = EPOCH_DIFF.total_seconds()
TWO_TO_THE_32 = 2 ** 32
TWO_TO_THE_32_DIV = 1 / TWO_TO_THE_32

__all__ = (
    'get_padded_size', 'unpack_str_from_bytes', 'StructPacking', 'ArgumentList',
    'Client', 'TimeTag', 'ColorRGBA', 'Infinitum', 'MidiMessage',
)

BytesOrString = Union[str, bytes]

def get_padded_size(s: BytesOrString, add_stop_byte: bool = True) -> int:
    length = len(s)
    if length % 4 == 0:
        if add_stop_byte:
            # add 4 bytes to ensure stop byte
            length += 4
        return length
    return (len(s) // 4 + 1) * 4

def unpack_str_from_bytes(b: bytes) -> Tuple[str, bytes]:
    stop_ix = b.index(b'\x00', 1)
    s = b[:stop_ix]
    psize = get_padded_size(s)
    b = b[psize:]
    return s.decode(), b

[docs]@dataclass class StructPacking: """Helper for zipping struct format strings and values """ value: Tuple[Any] format: str
[docs]class ArgumentList: """Container for :class:`~.arguments.Argument` """ def __init__(self, initlist=None): if initlist is None: initlist = [] else: initlist = initlist.copy() self.items: List['oscpython.arguments.Argument'] = initlist
[docs] def append(self, item: 'oscpython.arguments.Argument'): """Add an :class:`~.arguments.Argument` """ self.items.append(item)
[docs] def extend(self, other): """Merge a sequence of :class:`~.arguments.Argument` instances or another :class:`ArgumentList` """ if isinstance(other, ArgumentList): other = other.items self.items.extend(other)
def __getitem__(self, key) -> 'oscpython.arguments.Argument': return self.items[key] def __setitem__(self, key, item: 'oscpython.arguments.Argument'): self.items[key] = item def __delitem__(self, key): del self.items[key]
[docs] def get_struct_fmt(self) -> str: """Get the struct format string for all arguments in the list """ f = tuple(self.formats()) s = ''.join(f) return f'>{s}'
[docs] def pack(self) -> bytes: """Pack all arguments in the list to binary using :func:`struct.pack` """ fmt = self.get_struct_fmt() return struct.pack(fmt, *self.values())
[docs] def formats(self) -> Iterable[str]: """Iterate over the format strings for each argument """ for item in self: fmt = item.get_struct_fmt() if not len(fmt): continue yield fmt
[docs] def values(self) -> Iterable[Any]: """Iterate (flattened) over all argument values (taken from :meth:`.arguments.Argument.get_pack_value`) """ for item in self: value = item.get_pack_value() if value is None: continue yield from value
def __iter__(self): yield from self.items def __repr__(self): return f'<{self.__class__}: {self}>' def __str__(self): return str(self.items)
[docs]@dataclass class Client: """A network address and port """ address: str #: The host address port: int #: The service port
[docs] def to_tuple(self) -> Tuple[str, int]: """Create a tuple of ``(address, port)`` """ return (self.address, self.port)
[docs] @classmethod def from_tuple(cls, addr: Tuple[str, int]) -> 'Client': """Create a :class:`Client` from a tuple of ``(address, port)`` """ address, port = addr return cls(address=address, port=port)
[docs]@dataclass class TimeTag: """An OSC timetag represented as two 32-bit integers (formatted as NTP) The values for :attr:`seconds` and :attr:`fraction` are relative to the NTP epoch (number of seconds since January 1, 1900). """ seconds: int = 0 """Whole number of seconds since the epoch""" fraction: int = 0 """32-bit integer representing the fractional remainder of :attr:`seconds` """ Immediately: ClassVar['TimeTag'] """A constant used to send a special-case timetag meaning "immediately" """ @property def is_immediate(self) -> bool: """Whether the special case of "immediately" is indicated """ return self.seconds == 0 and self.fraction == 1 @property def float_seconds(self) -> float: """The :attr:`seconds` and :attr:`fraction` combined into a :any:`float` """ return self.seconds + (self.fraction * TWO_TO_THE_32_DIV)
[docs] def to_epoch(self) -> float: """Return the values as a POSIX timestamp """ return self.float_seconds - EPOCH_DIFF_SECONDS
[docs] def to_datetime_utc(self) -> datetime.datetime: """Create a :class:`datetime.datetime` in UTC """ return datetime.datetime.utcfromtimestamp(self.to_epoch())
[docs] def to_datetime(self) -> datetime.datetime: """Create a :class:`datetime.datetime` with the local timezone offset Note: The returned datetime is naive (tzinfo=None) """ return datetime.datetime.fromtimestamp(self.to_epoch())
[docs] @classmethod def from_float(cls, value: float) -> 'TimeTag': """Create a :class:`TimeTag` from an NTP timestamp """ seconds = int(value) fraction = (value - seconds) * TWO_TO_THE_32 return cls(seconds=seconds, fraction=int(fraction))
[docs] @classmethod def from_epoch(cls, seconds: float) -> 'TimeTag': """Create a :class:`TimeTag` from a POSIX timestamp """ return cls.from_float(seconds + EPOCH_DIFF_SECONDS)
[docs] @classmethod def from_datetime(cls, dt: datetime.datetime) -> 'TimeTag': """Create a :class:`TimeTag` from a :class:`datetime.datetime` The timezone behavior of the given datetime matches that of :meth:`datetime.datetime.timestamp` """ return cls.from_epoch(dt.timestamp())
[docs] @classmethod def now(cls) -> 'TimeTag': """Create a :class:`TimeTag` from the current date and time """ now = datetime.datetime.now() return cls.from_datetime(now)
[docs] @classmethod def utcnow(cls) -> 'TimeTag': """Create a :class:`TimeTag` using the current UTC date and time """ now = datetime.datetime.utcnow() return cls.from_datetime(now)
@classmethod def from_uint64(cls, value: int) -> 'TimeTag': kw = { 'seconds':value >> 32, 'fraction':value & 0xFFFFFFFF, } return cls(**kw) def to_uint64(self) -> int: return (self.seconds << 32) + (self.fraction & 0xFFFFFFFF) def _ordering(self, other) -> int: if isinstance(other, TimeTag): if self.is_immediate: if other.is_immediate: return 0 return -1 elif other.is_immediate: return 1 if self.seconds > other.seconds: return 1 elif self.seconds < other.seconds: return -1 if self.fraction > other.fraction: return 1 elif self.fraction < other.fraction: return -1 return 0 elif isinstance(other, datetime.datetime): self_dt = self.to_datetime() if self_dt > other: return 1 elif self_dt < other: return -1 return 0 else: return NotImplemented def __gt__(self, other): r = self._ordering(other) if r is NotImplemented: return NotImplemented return r == 1 def __lt__(self, other): r = self._ordering(other) if r is NotImplemented: return NotImplemented return r == -1 def __ge__(self, other): r = self._ordering(other) if r is NotImplemented: return NotImplemented return r == 1 or r == 0 def __le__(self, other): r = self._ordering(other) if r is NotImplemented: return NotImplemented return r == -1 or r == 0 def __eq__(self, other): r = self._ordering(other) if r is NotImplemented: return NotImplemented return r == 0 def __ne__(self, other): r = self._ordering(other) if r is NotImplemented: return NotImplemented return r != 0
TimeTag.Immediately = TimeTag(seconds=0, fraction=1)
[docs]@dataclass class ColorRGBA: """A 32-bit RGBA color with 8 bits per component """ r: int = 0 #: Red component (0-255) g: int = 0 #: Green component (0-255) b: int = 0 #: Blue component (0-255) a: int = 0 #: Alpha component (0-255) @classmethod def from_uint64(cls, value: int) -> 'ColorRGBA': kw = { 'r':(value >> 24) & 0xff, 'g':(value >> 16) & 0xff, 'b':(value >> 8) & 0xff, 'a':value & 0xff, } return cls(**kw) def to_uint64(self) -> int: return (self.r << 24) + (self.g << 16) + (self.b << 8) + self.a
[docs]@dataclass class MidiMessage: """A 4-byte MIDI message """ port_id: int = 0 #: Port id status_byte: int = 0 #: status_byte data1: int = 0 #: data1 data2: int = 0 #: data2 @classmethod def from_iterable(cls, it: Iterable) -> 'MidiMessage': attrs = ('port_id', 'status_byte', 'data1', 'data2') kw = {k:v for k,v in zip(attrs, it)} return cls(**kw) def as_tuple(self) -> Tuple[int]: attrs = ('port_id', 'status_byte', 'data1', 'data2') return tuple(getattr(self, attr) for attr in attrs)
[docs]class Infinitum: """An OSC "Infinitum" argument, typically referred to as an "Impulse" There is no value for the argument as its presence in an OSC message provides the only semantic meaning. """ def __eq__(self, other): return other is Infinitum or isinstance(other, Infinitum) def __ne__(self, other): return other is not Infinitum and not isinstance(other, Infinitum)