"""The base classes for backends."""
import inspect
import logging
from abc import ABCMeta, abstractmethod
from functools import wraps
from typing import TYPE_CHECKING, Dict, Optional, Set, Type, cast
if TYPE_CHECKING: # pragma: nocover
from j5.boards import Board # noqa
[docs]class CommunicationError(Exception):
"""
A communication error occurred.
This error is thrown when there is an error communicating with a board, if a more
specific exception is available, then that may be thrown instead, but it should
inherit from this one.
"""
def _wrap_method_with_logging(
backend_class: Type['Backend'],
method_name: str,
logger: logging.Logger,
) -> None:
old_method = getattr(backend_class, method_name)
signature = inspect.signature(old_method)
@wraps(old_method)
def new_method(*args, **kwargs): # type: ignore
retval = old_method(*args, **kwargs)
arg_map = signature.bind(*args, **kwargs).arguments
args_str = ", ".join(
f"{name}={value!r}"
for name, value in arg_map.items()
if name != "self"
)
retval_str = (f" -> {retval!r}" if retval is not None else "")
message = f"{method_name}({args_str}){retval_str}"
logger.debug(message)
return retval
setattr(backend_class, method_name, new_method)
def _wrap_methods_with_logging(backend_class: Type['Backend']) -> None:
component_classes = backend_class.board.supported_components() # type: ignore
for component_class in component_classes:
logger = logging.getLogger(component_class.__module__)
interface_class = component_class.interface_class()
for method_name in interface_class.__abstractmethods__:
_wrap_method_with_logging(backend_class, method_name, logger)
[docs]class Backend(metaclass=BackendMeta):
"""
The base class for a backend.
A backend is an implementation of a specific board for an environment.
It can hold data about the actual board it is controlling. There should be a ratio
of one instance of a Backend to one instance of a Board. The Backend object should
not hold any references to the Board, instead having it's methods executed by the
code for the individual Board.
A Backend usually also implements a number of ComponentInterfaces which thus allow
a physical component to be controlled by the abstract Component representation.
"""
[docs] @classmethod
@abstractmethod
def discover(cls) -> Set['Board']:
"""Discover boards that this backend can control."""
raise NotImplementedError # pragma: no cover
@property
@abstractmethod
def board(self) -> Type['Board']:
"""Type of board this backend implements."""
raise NotImplementedError # pragma: no cover
@property
@abstractmethod
def firmware_version(self) -> Optional[str]:
"""The firmware version of the board."""
raise NotImplementedError # pragma: no cover
[docs]class Environment:
"""
A collection of backends that can work together.
A number of Backends that we wish to use in a grouping, such as those that
all work together in hardware can be added to this group. We can then pass
Environments to a Robot object, so that the Robot object can call different
methods based on where it is being used.
e.g Hardware Environment
We have `n` boards of `n` different types. They are all physical hardware.
We create an Environment containing the Backends to control the physical
hardware for our boards.
It is later realised that we want to test code without the physical hardware.
We can add Console backends to an environment, and instantiate our Robot object
with that environment, so that the console is manipulated rather than the hardware.
This allows for a high degree of code reuse and ensures API compatibility in
different situations.
"""
def __init__(self, name: str):
self.name = name
self.board_backend_mapping: Dict[Type['Board'], Type[Backend]] = {}
@property
def supported_boards(self) -> Set[Type['Board']]:
"""The boards that are supported by this environment."""
return set(self.board_backend_mapping.keys())
def __str__(self) -> str:
"""Get a string representation of this group."""
return self.name
[docs] def register_backend(self, backend: Type[Backend]) -> None:
"""Register a new backend with this environment."""
board_type: Type['Board'] = cast(Type['Board'], backend.board)
if board_type in self.board_backend_mapping.keys():
raise RuntimeError(f"Attempted to register multiple backends for"
f" {board_type.__name__} in the same environment.")
self.board_backend_mapping[board_type] = backend
[docs] def get_backend(self, board: Type['Board']) -> Type[Backend]:
"""Get the backend for a board."""
if board not in self.supported_boards:
raise NotImplementedError(f"The {str(self)} does not support {str(board)}")
return self.board_backend_mapping[board]
[docs] def merge(self, other: 'Environment') -> None:
"""
Merge in the board-backend mappings from another environment.
This allows vendors to predefine Environments and API authors
can then merge several vendor environments to get the one that
they need for their API.
This method will fail if any board is defined in both environments,
as it is unclear which one has the correct mapping.
"""
intersection = self.supported_boards & other.supported_boards
if len(intersection) > 0:
common_boards = ", ".join(map(lambda x: x.__name__, intersection))
raise RuntimeError(
f"Attempted to merge two Environments"
f" that both contain: {common_boards}")
self.board_backend_mapping = {
**self.board_backend_mapping,
**other.board_backend_mapping,
}