Source code for oscpython.transport

# from loguru import logger
from typing import Optional, Tuple, ClassVar
import asyncio
from dataclasses import dataclass, field

from oscpython.common import Client, TimeTag
from oscpython.address import Address, AddressSpace, AddressNode
from oscpython.messages import Packet, Message, Bundle, PacketStartError

[docs]@dataclass class QueueItem: """Item used in :attr:`BaseServer.rx_queue` and :attr:`BaseServer.tx_queue` """ data: bytes #: The incoming or outgoing data addr: Tuple[str, int] #: The remote address as a tuple of (host, port) timetag: TimeTag = field(default_factory=TimeTag.now) """Timestamp of when the item was created For incoming data, this indicates the time of reception """
[docs]@dataclass(order=True) class QueuedBundle: """Item to store bundles in a :class:`asyncio.PriorityQueue` sorted by :attr:`bundle_timetag` """ bundle_timetag: TimeTag """The :attr:`~.messages.Bundle.timetag` value of the :attr:`bundle`. This is duplicated solely for sorting purposes """ rx_timetag: Optional[TimeTag] = field(compare=False) """The timestamp of when the bundle was received """ bundle: Optional[Bundle] = field(compare=False) """The :class:`~.messages.Bundle` instance """
[docs]class MessageHandler: """Handles dispatching packets to an :class:`~.address.AddressSpace` This is instanciated and managed by :class:`BaseServer` Arguments: address_space: The value for :attr:`address_space` """ address_space: AddressSpace """The :class:`~.address.AddressSpace` for the handler""" queued_bundles: asyncio.PriorityQueue """A queue for :class:`QueuedBundle` items whose :attr:`~QueuedBundle.bundle_timetag` is in the future """ running: bool """``True`` if the handler is running""" loop: asyncio.BaseEventLoop def __init__(self, address_space: AddressSpace): self.address_space = address_space self.queued_bundles = asyncio.PriorityQueue() self.queued_bundle_evt = asyncio.Event() self.running = False self._dispatch_loop_task = None self.loop = asyncio.get_event_loop()
[docs] async def open(self): """Start background tasks needed for queue management """ if self.running: return self.running = True self._dispatch_loop_task = asyncio.create_task(self.dispatch_loop())
[docs] async def close(self): """Stop dispatching and exit all background tasks """ if not self.running: return self.running = False t = self._dispatch_loop_task self._dispatch_loop_task = None if t is not None: qitem = QueuedBundle( bundle_timetag=TimeTag.Immediately, bundle=None, rx_timetag=None, ) await self.queued_bundles.put(qitem) self.queued_bundle_evt.set() await t
[docs] async def handle_packet(self, packet: Packet, timetag: TimeTag): """Handle an incoming :class:`~.messages.Packet` If the packet is a :class:`~.messages.Message`, the :attr:`address_space` is searched for any matching :class:`nodes <.address.AddressNode>` and the :meth:`~.address.AddressNode.dispatch` method will be called. If the packet is a :class:`~.messages.Bundle`, this method will be called recursively for all of the :attr:`~.messages.Bundle.packets` contained in the bundle. Arguments: packet: A :class:`~.messages.Message` or :class:`~.messages.Bundle` received from the network timetag: Timestamp of when the data was received. This is the :attr:`~QueueItem.timetag` attribute of the :class:`QueueItem` containing the packet data, **not** the :attr:`~.messages.Message.timetag` of the packet itself """ if not self.running: return tasks = [] if isinstance(packet, Message): for node in self.address_space.match(packet.address): node.dispatch(packet, timetag) elif isinstance(packet, Bundle): if packet.timetag > TimeTag.now(): qitem = QueuedBundle( rx_timetag=timetag, bundle_timetag=packet.timetag, bundle=packet, ) await self.queued_bundles.put(qitem) self.queued_bundle_evt.set() return for item in packet: tasks.append(self.handle_packet(item, timetag)) else: # pragma: no cover raise ValueError('Must be either a Message or Bundle') if len(tasks): await asyncio.gather(*tasks)
# @logger.catch
[docs] async def dispatch_loop(self): """Consume from :attr:`queued_bundles` and dispatch them according to their timetags This runs as long as :attr:`running` is True and the :class:`asyncio.Task` for it is managed by the :meth:`open` and :meth:`close` methods. """ async def wait_for_evt(timeout=None): try: await asyncio.wait_for(self.queued_bundle_evt.wait(), timeout) self.queued_bundle_evt.clear() except asyncio.TimeoutError: return False return True while self.running: item = await self.queued_bundles.get() if item.bundle is None or not self.running: self.queued_bundles.task_done() break now = TimeTag.now() if item.bundle_timetag > now: await self.queued_bundles.put(item) timeout = item.bundle_timetag.float_seconds - now.float_seconds await wait_for_evt(timeout) continue self.queued_bundle_evt.clear() self.queued_bundles.task_done() await self.handle_packet(item.bundle, item.rx_timetag)
[docs]class BaseServer: """Base server class Arguments: listen_address: Value for :attr:`listen_address`. If not provided, defaults to :attr:`default_listen_address` address_space: Value for :attr:`address_space`. If not provided, an empty one will be created. """ default_listen_address: ClassVar[Client] = Client(address='0.0.0.0', port=9000) """Default value for :attr:`listen_address` """ listen_address: Client """The address and port to bind the server to """ running: bool """``True`` if the server is running """ rx_queue: asyncio.Queue """A :class:`~asyncio.Queue` for incoming data as :class:`QueueItem` instances """ tx_queue: asyncio.Queue """A :class:`~asyncio.Queue` for outgoing data as :class:`QueueItem` instances """ loop: asyncio.BaseEventLoop def __init__(self, listen_address: Optional[Client] = None, address_space: Optional[AddressSpace] = None): if listen_address is None: listen_address = self.default_listen_address self.listen_address = listen_address if address_space is None: address_space = AddressSpace() self.__address_space = address_space self.__message_handler = MessageHandler(address_space) self.running = False self.rx_queue = asyncio.Queue() self.tx_queue = asyncio.Queue() self._tx_loop_task = None self._rx_loop_task = None self.loop = asyncio.get_event_loop() @property def address_space(self) -> AddressSpace: """The :class:`~.address.AddressSpace` associated with the server """ return self.__address_space @property def message_handler(self) -> MessageHandler: """The :class:`MessageHandler` for the server """ return self.__message_handler
[docs] async def send_packet(self, packet: Packet): """Send either a :class:`~.messages.Message` or :class:`~.messages.Bundle` The :attr:`~.messages.Packet.remote_client` in the given packet is used as the destination host address and port. Arguments: packet: The packet to send """ data = packet.build_packet() addr = packet.remote_client.to_tuple() await self.tx_queue.put(QueueItem(data=data, addr=addr))
[docs] async def open(self): """Open the server connections and start all background tasks Calls :meth:`open_endpoint` and creates tasks for :meth:`tx_loop` and :meth:`rx_loop`. """ if self.running: return self.running = True await self.open_endpoint() self._tx_loop_task = asyncio.create_task(self.tx_loop()) self._rx_loop_task = asyncio.create_task(self.rx_loop()) await self.message_handler.open()
[docs] async def close(self): """Stop all background tasks and close all connections Calls :meth:`close_endpoint` and stops the :meth:`tx_loop` and :meth:`rx_loop` tasks. """ if not self.running: return self.running = False await self.message_handler.close() t = self._tx_loop_task self._tx_loop_task = None if t is not None: await self.tx_queue.put(None) await t t = self._rx_loop_task self._rx_loop_task = None if t is not None: await self.rx_queue.put(None) await t await self.close_endpoint()
# @logger.catch
[docs] async def tx_loop(self): """Wait for items on :attr:`tx_queue` and send them using :meth:`send_queue_item` This runs as long as :attr:`running` is True and the task for it is managed by the :meth:`open` and :meth:`close` methods """ while self.running: item = await self.tx_queue.get() if item is None or not self.running: self.tx_queue.task_done() break r = await self.send_queue_item(item) if r: self.tx_queue.task_done()
# @logger.catch
[docs] async def rx_loop(self): """Wait for items on :attr:`rx_queue` and pass them to :attr:`message_handler` using :meth:`MessageHandler.handle_packet`. This runs as long as :attr:`running` is True and the task for it is managed by the :meth:`open` and :meth:`close` methods """ while self.running: item = await self.rx_queue.get() if item is None or not self.running: self.rx_queue.task_done() break client = Client.from_tuple(item.addr) remaining = item.data tasks = [] while len(remaining): try: packet, remaining = Packet.parse(remaining, remote_client=client) except PacketStartError: break tasks.append(self.message_handler.handle_packet(packet, item.timetag)) if len(tasks): await asyncio.gather(*tasks) self.rx_queue.task_done()
[docs] async def open_endpoint(self): """Open implementation-specific items for communication Called from the :meth:`open` method """ raise NotImplementedError
[docs] async def close_endpoint(self): """Close any implementation-specific communication items Called from the :meth:`close` method """ raise NotImplementedError
[docs] async def send_queue_item(self, item: QueueItem) -> bool: """Handle transmission of a :class:`QueueItem's <QueueItem>` bytestring Returns: bool: True if the item was sent """ raise NotImplementedError
[docs]class DatagramProtocol(asyncio.DatagramProtocol): """Asyncio :class:`~asyncio.Protocol` for datagram communication """ rx_queue: asyncio.Queue """The parent server's :attr:`~BaseServer.rx_queue`""" def __init__(self, rx_queue: asyncio.Queue): self.rx_queue = rx_queue self.connect_evt = asyncio.Event()
[docs] def connection_made(self, transport): self.connect_evt.set()
# @logger.catch
[docs] def datagram_received(self, data: bytes, addr: Tuple[str, int]): """Place the incoming data on the :attr:`rx_queue` """ self.rx_queue.put_nowait(QueueItem(data=data, addr=addr))
[docs]class DatagramServer(BaseServer): """An OSC Client/Server using UDP """ transport: Optional[asyncio.DatagramTransport] """Datagram transport created by :meth:`~asyncio.loop.create_datagram_endpoint` """ protocol: Optional[DatagramProtocol] """Protocol created by :meth:`~asyncio.loop.create_datagram_endpoint` """ def __init__(self, listen_address: Optional[Client] = None, address_space: Optional[AddressSpace] = None): super().__init__(listen_address, address_space) self.transport = None self.protocol = None
[docs] async def open_endpoint(self): t, p = await self.loop.create_datagram_endpoint( lambda: DatagramProtocol(self.rx_queue), self.listen_address.to_tuple(), ) self.transport, self.protocol = t, p await self.protocol.connect_evt.wait()
[docs] async def close_endpoint(self): self.transport.close() self.transport = None self.protocol = None
[docs] async def send_queue_item(self, item: QueueItem) -> bool: if self.transport is None: return False self.transport.sendto(item.data, item.addr) return True