from typing import (
    Optional, Sequence, List, Dict, Tuple, Union, Iterator, Any,
    KeysView, ValuesView, ItemsView, ClassVar, Callable,
from dataclasses import dataclass, field
import re

from pydispatch import Dispatcher
from pydispatch.dispatch import Event

from oscpython.arguments import StringArgument

messages, Message, Bundle = None, None, None
def _import_message_classes():
    global messages, Message, Bundle
    if messages is not None:
    from oscpython import messages as _msg_module
    messages = _msg_module
    Message = _msg_module.Message
    Bundle = _msg_module.Bundle

StrOrAddress = Union[str, 'Address']

__all__ = ('Address', 'AddressPart', 'AddressSpace', 'AddressNode')

[docs]class AddressPart: """One "part" of an :class:`Address` (delimited by forward slash) """ __slots__ = ('__part', '__is_root', '__re_pattern', '__has_wildcard') def __init__(self, part: str, is_root: bool, re_pattern: Optional[re.Pattern] = None, has_wildcard: Optional[bool] = None): self.__part = part self.__is_root = is_root self.__re_pattern = re_pattern self.__has_wildcard = has_wildcard def __eq__(self, other): if not isinstance(other, AddressPart): return NotImplemented if self.part != other.part: return False if self.is_root != other.is_root: return False return True @property def part(self) -> str: """The address part as a string """ return self.__part @property def is_root(self) -> bool: """True if this is the first part of the :class:`Address` """ return self.__is_root @property def re_pattern(self) -> re.Pattern: """The address part as a compiled :class:`re.Pattern` to be used for OSC address pattern matching """ p = self.__re_pattern if p is None: p, h = self.compile_re(self.part) self.__re_pattern, self.__has_wildcard = p, h return p @property def has_wildcard(self) -> bool: """True if the :attr:`re_pattern` contains any wildcard characters """ h = self.__has_wildcard if h is None: p, h = self.compile_re(self.part) self.__re_pattern, self.__has_wildcard = p, h return h
[docs] def match(self, other: 'AddressPart') -> bool: """Match this instance with another :class:`AddressPart` using OSC pattern matching """ if not isinstance(other, AddressPart): return NotImplemented if self.has_wildcard: if not self.re_pattern.fullmatch(other.re_pattern.pattern): return False elif other.has_wildcard: m = other.re_pattern.fullmatch(self.re_pattern.pattern) if not m: return False else: return self.re_pattern.pattern == other.re_pattern.pattern return True
[docs] @staticmethod def compile_re(pattern: str) -> Tuple[re.Pattern, bool]: """Create a regular expression used for OSC address pattern matching The pattern is only valid within the parts separated by forward slash (``"/"``) Returns ------- pattern : re.Pattern The compiled pattern has_wildcard : bool ``True`` if the pattern contains any pattern-matching characters """ # osc_style = [r'[a-d]', r'[!a-d]', r'{foo,bar}', r'a?c'] # re_style = [r'[a-d]', r'[^a-d]', r'(foo|bar)', r'a\w?c'] has_wildcard = False pattern = pattern.strip('/') if '*' in pattern: pattern = pattern.replace('*', r'[\w|\+]*') has_wildcard = True if '[' in pattern: has_wildcard = True if '[!' in pattern: pattern = pattern.replace('[!', '[^') has_wildcard = True for c, d in zip('{,}', '(|)'): if c in pattern: pattern = pattern.replace(c, d) has_wildcard = True if '?' in pattern: pattern = pattern.replace('?', r'\w?') has_wildcard = True return re.compile(pattern), has_wildcard
def __repr__(self): return f'<{self.__class__}: "{self}">' def __str__(self): if self.is_root: return f'/{self.part}' return f'{self.part}'
[docs]@dataclass class Address(StringArgument): """An OSC address pattern """ pattern: str = '/' #: The OSC address string parts: Tuple[AddressPart] = field(default_factory=tuple, repr=False) """A :class:`tuple` of :class:`AddressPart` instances derived from the :attr:`pattern`, delimited by forward slash (``"/"``) """ match_strings: ClassVar[str] = '?*[]{}' def __post_init__(self): if len( and self.pattern == '/': self.pattern = self.parts_to_pattern( elif self.pattern != '/': = self.pattern_to_parts(self.pattern)
[docs] @staticmethod def parts_to_pattern(parts: Sequence[AddressPart]) -> str: """Convert the given :class:`parts <AddressPart>` to an OSC address string """ # print(f'parts_to_pattern: {parts=}') if len(parts) == 1: pattern = parts[0].part else: pattern = '/'.join((part.part for part in parts)) if parts[0].is_root: pattern = f'/{pattern}' return pattern
[docs] @staticmethod def pattern_to_parts(pattern: str) -> Tuple[AddressPart]: """Convert the given OSC address string to a :class:`tuple` of :class:`parts <AddressPart>` """ parts = [] if '//' in pattern: pattern = pattern.split('//')[-1] for i, part in enumerate(pattern.split('/')): if not len(part): continue if i == 0: part = f'/{part}' parts.append(AddressPart(part=part, is_root=i == 0))#, index=i+1)) else: is_root = pattern.startswith('/') for i, part in enumerate(pattern.lstrip('/').split('/')): if not len(part): continue parts.append(AddressPart(part=part, is_root=is_root and i == 0))#, index=i)) return tuple(parts)
[docs] def get_pack_value(self) -> Optional[Tuple[bytes]]: return (self.pattern.encode(),)
@property def is_concrete(self) -> bool: """True if the address is "concrete" (contains no pattern matching characters) """ r = getattr(self, '_is_concrete', None) if r is not None: return r if '//' in self.pattern: r = False else: r = not any((c in self.pattern for c in self.match_strings)) self._is_concrete = r return r @property def pattern_parts(self) -> Tuple[AddressPart]: return tuple((part.part for part in @property def length(self) -> int: return len(
[docs] @classmethod def from_parts(cls, parts: Sequence[AddressPart]) -> 'Address': """Create an instance from the given sequence of :attr:`parts` """ return cls(parts=tuple(parts))
def __getitem__(self, key): if isinstance(key, slice): parts =[key] if key.start not in (0, None) and len(parts): parts = list(parts) p0 = parts[0] parts[0] = AddressPart(part=p0.part, is_root=False) else: parts = [[key]] if key > 0: p0 = parts[0] parts[0] = AddressPart(part=p0.part, is_root=False) return self.from_parts(parts) def __div__(self, other): return self.join(other) def __truediv__(self, other): return self.join(other) def __len__(self): # return len([p for p in if len(p)]) return self.length def __iter__(self): yield from
[docs] def copy(self) -> 'Address': """Create a copy of the instance """ cls = self.__class__ return cls.from_parts(
[docs] def join(self, other) -> 'Address': """Join the address with either a str or :class:`Address` instance, separating the :attr:`pattern` with ``"/"`` """ if not isinstance(other, (str, Address)): return NotImplemented if not isinstance(other, Address): other = Address(pattern=other) if '//' in other.pattern: raise ValueError('Cannot join with another "//" address') all_parts = list( oth_parts = list( # assert not oth_parts[0].is_root all_parts.extend(oth_parts) cls = self.__class__ return cls.from_parts(all_parts)
[docs] def match(self, other: Union['Address', str]) -> bool: """Match this address with another using pattern-matching rules Arguments: other: Either a :class:`str` or :class:`Address` instance Returns: bool: ``True`` if the given address matches """ if not isinstance(other, Address): other = Address(pattern=other) if self.is_concrete and other.is_concrete: return self.pattern == other.pattern elif not self.is_concrete and not other.is_concrete: raise ValueError('At least one address must be concrete') # print(f'{self.pattern=}, {other.pattern=}') if '//' not in self.pattern and '//' not in other.pattern: if len(self) != len(other): return False for my_part, oth_part in zip(self, other): if not my_part.match(oth_part): return False return True if '//' in self.pattern: wc_parts, parts =, elif '//' in other.pattern: wc_parts, parts =, i = 0 for part in parts: try: wc_part = wc_parts[i] except IndexError: break if part.match(wc_part): i += 1 return i == len(wc_parts)
[docs]class AddressSpace(Dispatcher): """An OSC address space, container for root (top-level) :class:`AddressNode` instances Attributes: root_nodes: Mapping of root nodes using the :attr:`` as keys :Events: .. event:: on_message(address: Address, message: Message, timetag: TimeTag) Fired when a message is received by a :class:`server <.transport.BaseServer>`. :param address: The OSC address matching the message :type address: oscpython.address.Address :param message: The OSC message :type message: oscpython.messages.Message :param timetag: The timestamp of when the message was received :type timetag: oscpython.common.TimeTag """ root_nodes: Dict[str, 'AddressNode'] _events_ = ['on_message'] def __init__(self): self.root_nodes = {}
[docs] def add_root(self, name: str, cls: Optional[type] = None) -> 'AddressNode': """Create an :class:`AddressNode` and add it to :attr:`root_nodes` Arguments: name (str): The node :attr:`` cls (optional): A subclass of :class:`AddressNode` to use when creating the root node. If not provided, defaults to :class:`AddressNode` """ if cls is None: cls = AddressNode if name in self: obj = self[name] if obj.__class__ is not cls: raise KeyError(f'Node with name "{name}" already exists') return obj obj = cls(name) obj.address_space = self self.root_nodes[name] = obj return obj
[docs] def add_root_instance(self, node: 'AddressNode'): """Add an existing :class:`AddressNode` instance to :attr:`root_nodes` Arguments: node: The instance to add """ if in self: raise KeyError(f'Node with name "{}" already exists') node.parent = None self.root_nodes[] = node node.address_space = self
[docs] def create_from_address(self, address: StrOrAddress, cls: Optional[type] = None) -> 'AddressNode': """Create node or nodes from the given OSC address and return the final node on the tree Arguments: address: The OSC address cls: If provided, a subclass of :class:`AddressNode` to use when creating node instances Raises: KeyError: If the address is a root address (containing only one address part) and a node exists in :attr:`root_nodes` that does **not** match the given ``cls`` """ if cls is None: cls = AddressNode if not isinstance(address, Address): address = Address(pattern=address) root_name =[0].part if root_name in self: address = address[1:] root = self[root_name] if root.__class__ is not cls: raise KeyError(f'Node with name "{root_name}" already exists') if not len(address): return root, root last_child = root.create_children_from_address(address) else: root, last_child = cls.create_from_address(address) self.add_root_instance(root) return root, last_child
[docs] def find(self, address: StrOrAddress) -> Optional['AddressNode']: """Search for a node matching the given address Arguments: address: The address to search for """ if not isinstance(address, Address): address = Address(pattern=address) root = self.get(address[0].pattern.lstrip('/')) if root is None: return None return root.find(address)
[docs] def match(self, address: StrOrAddress) -> Iterator['AddressNode']: """Iterate through any nodes or child nodes matching the given address See :meth:`AddressNode.match` Arguments: address: The address to match """ if not isinstance(address, Address): address = Address(pattern=address) for root in self: yield from root.match(address)
[docs] def walk(self) -> Iterator['AddressNode']: """Iterate over all :attr:`root_nodes` and their children Calls :meth:`~AddressNode.walk` on each instances in :attr:`root_nodes` """ for root in self: yield from root.walk()
def __getitem__(self, key) -> 'AddressNode': return self.root_nodes[key]
[docs] def get(self, key: str, default: Optional[Any] = None) -> Optional['AddressNode']: """Get the root node matching the given name. If not found, return the default value Arguments: key: The :attr:`` default: The value to return if not found """ return self.root_nodes.get(key)
def __contains__(self, key): return key in self.root_nodes def __len__(self): return len(self.root_nodes) def __iter__(self): yield from self.values()
[docs] def keys(self) -> KeysView[str]: """Alias for :attr:`root_nodes.keys` """ return self.root_nodes.keys()
[docs] def values(self) -> ValuesView['AddressNode']: """Alias for :attr:`root_nodes.values` """ return self.root_nodes.values()
[docs] def items(self) -> ItemsView[str, 'AddressNode']: """Alias for :attr:`root_nodes.items` """ return self.root_nodes.items()
[docs]class AddressNode: """A node within an OSC address space Attributes: children: Mapping of child nodes using the :attr:`name` as keys Arguments: name: The node :attr:`name` parent: The node :attr:`parent` """ children: Dict[str, 'AddressNode'] __slots__ = ( '__name', '__parent', '__address', '__address_space', '__address_part', '__part_index', 'children', '__event_handler', ) def __init__(self, name: str, parent: Optional['AddressNode'] = None): self.__name = name self.__parent = parent self.__address = None self.__address_space = None self.__address_part = None self.__part_index = None self.__event_handler = None self.children = {} @property def name(self) -> str: """The node name """ return self.__name @property def address(self) -> Address: """The full OSC address for the node """ a = self.__address if a is not None: return a if self.is_root: a = self.__address = Address(pattern=f'/{}') else: a = self.__address = self.parent.address / return a @property def address_part(self) -> AddressPart: """The :class:`AddressPart` for the node within its :attr:`address` """ p = self.__address_part if p is None: p = self.__address_part =[self.__part_index] return p @property def parent(self) -> Optional['AddressNode']: """The parent node, or ``None`` if this is the :attr:`root` """ return self.__parent @parent.setter def parent(self, value: Optional['AddressNode']): if value is self.__parent: return if value is not None and in value.children: raise KeyError(f'Node with name "{}" already exists in "{value!r}"') if self.__parent is not None: del self.__parent.children[] self.__parent = value if self.__parent is not None: self.__parent.children[] = self self._reset_memoized_attrs() @property def part_index(self) -> int: """Index of the node :attr:`address_part` """ ix = self.__part_index if ix is None: ix = self.__part_index = len(self.address) - 1 return ix def _reset_memoized_attrs(self): self.__address = None self.__address_part = None self.__part_index = None if not self.is_root: self.__address_space = None for child in self: child._reset_memoized_attrs() @property def root(self) -> 'AddressNode': """The root node of the tree """ if self.is_root: return self return self.parent.root @property def address_space(self) -> Optional[AddressSpace]: """The :class:`AddressSpace` the node belongs to """ if self.is_root: return self.__address_space return self.root.address_space @address_space.setter def address_space(self, value: AddressSpace): if not self.is_root: raise ValueError('Only root nodes can belong to an AddressSpace') if value is self.__address_space: return if self.__address_space is not None: del self.__address_space.root_nodes[] self.__address_space = value if self.__address_space is not None: if not in self.__address_space: self.__address_space.add_root_instance(self) @property def is_root(self) -> bool: """``True`` if the node is at the root of the tree (has no :attr:`parent`) """ return self.parent is None @property def event_handler(self) -> Optional[Event]: """An :class:`~pydispatch.dispatch.Event` instance used for callbacks """ return self.__event_handler @property def has_callbacks(self) -> bool: """``True`` if any callbacks have been set on the node """ h = self.event_handler if h is None: return False if len(h.listeners) or len(h.aio_listeners): return True return False
[docs] def add_callback(self, cb: Callable, aio_loop: Optional['asyncio.BaseEventLoop'] = None): """Add a method, function or :term:`coroutine function` to the :attr:`event_handler` Arguments: cb: The callback function aio_loop: If the callback is a :term:`coroutine function`, the :class:`event loop <asyncio.BaseEventLoop>` associated with it The callback should accept the signature:: def cb(node: oscpython.address.AddressNode, message: oscpython.messages.Message, timetag: oscpython.common.TimeTag) -> None: pass :Callback Arguments: node: :class:`AddressNode` The node instance that originated the event message: :class:`.messages.Message` The received message timetag: :class:`.common.TimeTag` Timestamp of when the message was received. See :meth:`.transport.BaseServer.handle_packet` for details """ h = self.event_handler if h is None: h = self.__event_handler = Event( h.add_listener(cb, __aio_loop__=aio_loop)
[docs] def remove_callback(self, cb: Callable): """Remove a callback previously attached by :meth:`add_callback` """ self.event_handler.remove_listener(cb) if not self.has_callbacks: self.__event_handler = None
[docs] def dispatch(self, message: 'oscpython.messages.Message', timetag: 'oscpython.common.TimeTag'): """Called when a message is received that matches this node :attr:`address` Triggers any callbacks registered in the :attr:`event_handler` and emits the ``'on_message'`` event on the :attr:`address_space` Arguments: message: The received message timetag: Timestamp of when the message was received. """ h = self.event_handler if h is not None: h(self, message, timetag) sp = self.address_space if sp is not None: sp.emit('on_message', self.address, message, timetag)
[docs] def find(self, address: StrOrAddress) -> Optional['AddressNode']: """Search for a node matching the given relative address Arguments: address: The address to search for """ if not isinstance(address, Address): address = Address(pattern=address) ix = self.part_index try: part =[ix] except IndexError: return None if part.part != self.address_part.part: return None if len(address) == len(self.address): return self child_name =[ix+1].part child = self.get(child_name) if child is None: return None return child.find(address)
[docs] def match(self, address: StrOrAddress) -> Iterator['AddressNode']: """Iterate through any nodes or child nodes matching the given address This uses the :meth:`Address.match` method to follow OSC pattern matching rules Arguments: address: The address to match Note: The address argument must be absolute """ part_index = self.part_index my_part = self.address_part try: match_part =[part_index] except IndexError: return if match_part.match(my_part): if len(address) == len(self.address): yield self for child in self: yield from child.match(address)
[docs] def walk(self) -> Iterator['AddressNode']: """Iterate through this node and all of its descendants """ yield self for child in self: yield from child.walk()
[docs] def create_message(self, *args, **kwargs) -> 'oscpython.messages.Message': """Create a :class:`~.messages.Message` using this node's :attr:`address` Positional and keyword arguments (``*args`` and ``**kwargs``) will be passed directly to :meth:`.messages.Message.create` """ _import_message_classes() addr = self.address.copy() return Message.create(addr, *args, **kwargs)
[docs] def create_bundled_message(self, *args, **kwargs) -> 'oscpython.messages.Bundle': """Create a :class:`~.messages.Bundle` containing a :class:`~.messages.Message` using this node's :attr:`address` Keyword arguments (``**kwargs``) will be passed to the Bundle's ``__init__`` method and positional arguments (``*args``) will be passed to :meth:`.messages.Message.create` """ msg = self.create_message(*args) bun = Bundle(**kwargs) bun.add_packet(msg) return bun
[docs] def add_child(self, name: str, cls: Optional[type] = None) -> 'AddressNode': """Add a child node to this point in the tree Arguments: name (str): The node :attr:`name` cls (optional): A subclass of :class:`AddressNode` to use when creating the child node. If not provided, the class is inherited """ if cls is None: cls = self.__class__ if name in self: obj = self[name] if cls is not obj.__class__: raise KeyError(f'Node with name "{name}" already exists') return obj obj = cls(name, self) self.children[name] = obj return obj
[docs] def add_child_instance(self, child: 'AddressNode'): """Add an existing :class:`AddressNode` instance to :attr:`children` Arguments: child: The instance to add """ if in self: _child = self[] if _child is not child: raise KeyError(f'Node with name "{}" already exists') child.parent = self self.children[] = child return child
[docs] def create_children_from_address(self, address: StrOrAddress) -> 'AddressNode': """Create node or nodes from the given (relative) OSC address and return the final node of the tree Arguments: address: The OSC address (relative to this node) """ if not isinstance(address, Address): address = Address(pattern=address) child_name =[0].part address = address[1:] child = self.get(child_name) if child is None: child = self.add_child(child_name) if not len(address): return child return child.create_children_from_address(address)
[docs] @classmethod def create_from_address(cls, address: StrOrAddress) -> Tuple['AddressNode', 'AddressNode']: """Create node or nodes from the given OSC address Arguments: address: The OSC address Returns ------- node : AddressNode The first (root) node created last_child : AddressNode The last child created (the end of the address string) """ if not isinstance(address, Address): address = Address(pattern=address) name =[0].part address = address[1:] node = cls(name) if not len(address): return node, node last_child = node.create_children_from_address(address) return node, last_child
def __getitem__(self, key): return self.children[key]
[docs] def get(self, key, default=None) -> Optional['AddressNode']: """Get the child node matching the given name. If not found, return the default value """ return self.children.get(key)
def __contains__(self, key): return key in self.children def __len__(self): return len(self.children) def __iter__(self): yield from self.values()
[docs] def keys(self) -> KeysView[str]: """Alias for :meth:`children.keys` """ return self.children.keys()
[docs] def values(self) -> ValuesView['AddressNode']: """Alias for :meth:`children.values` """ return self.children.values()
[docs] def items(self) -> ItemsView[str, 'AddressNode']: """Alias for :meth:`children.items` """ return self.children.items()
def __repr__(self): return f'<{self.__class__}: "{self.address.pattern}">' def __str__(self): return f'{}'