Source code for evolver.device

import logging
import time
from collections import defaultdict
from math import prod

from pydantic import Field, ValidationInfo, field_serializer, field_validator

from evolver.base import BaseInterface, ConfigDescriptor
from evolver.connection.interface import Connection
from evolver.controller.interface import Controller
from evolver.hardware.interface import EffectorDriver, HardwareDriver, SensorDriver
from evolver.history.interface import History
from evolver.history.standard import HistoryServer
from evolver.logutils import EVENT, LogHistoryCaptureHandler
from evolver.serial import EvolverSerialUART
from evolver.settings import settings

DEFAULT_SERIAL = EvolverSerialUART
DEFAULT_HISTORY = HistoryServer


class Experiment(BaseInterface.Config):
    enabled: bool = True
    controllers: list[Controller] = []

    # this seemed to have been required for the tests of config symmetry.
    # Without it there is pydantic error about unkown type (the underlying
    # Controller class) - so seemed to be kind of bypassing the evolver
    # BaseModel hookups?
    @field_serializer("controllers")
    def serialize_controllers(self, data):
        return [ConfigDescriptor.model_validate(c) for c in data]


[docs] class Evolver(BaseInterface): class Config(BaseInterface.Config): name: str = "Evolver" namespace: str = "unspecified" vial_layout: list[int] = Field( default=settings.DEFAULT_VIAL_LAYOUT, description="The layout of the vials in 2 or 3 dimensions. Always left-to-right bottom-top-top order.", min_length=2, max_length=3, ) vials: list = list(range(settings.DEFAULT_NUMBER_OF_VIALS_PER_BOX)) hardware: dict[str, HardwareDriver] = {} experiments: dict[str, Experiment] = {} serial: Connection = ConfigDescriptor.model_validate(DEFAULT_SERIAL) history: History = ConfigDescriptor.model_validate(DEFAULT_HISTORY) enable_control: bool = True interval: int = settings.DEFAULT_LOOP_INTERVAL raise_loop_exceptions: bool = False abort_on_control_errors: bool = False abort_on_commit_errors: bool = False skip_control_on_read_failure: bool = True log_level: int = EVENT @field_validator("vials", mode="after") @classmethod def validate_vials(cls, value: list[int], info: ValidationInfo): total_slots = prod(info.data["vial_layout"]) if len(value) > total_slots or max(value) >= total_slots: raise ValueError(f"Vials configured exceed total available slots from layout {total_slots}") return value
[docs] def __init__(self, *args, **kwargs): self.last_read = defaultdict(lambda: int(-1)) super().__init__(*args, evolver=self, **kwargs) # We have to turn experiment controllers passed in as configdescriptors # to objects while passing in self so controllers can consume hardware # and read from history, etc. for experiment in self.experiments.values(): for i in range(len(experiment.controllers)): elem = experiment.controllers[i] if isinstance(elem, ConfigDescriptor): experiment.controllers[i] = elem.create(non_config_kwargs={"evolver": self}) self._setup_log_capture()
def _setup_log_capture(self): self._log_capture_handler = LogHistoryCaptureHandler(self.history) self._log_capture_handler.setLevel(self.log_level) logger = logging.getLogger(settings.DEFAULT_LOGGER) logger.addHandler(self._log_capture_handler) logger.setLevel(self.log_level) def __del__(self): if handler := getattr(self, "_log_capture_handler", None): logging.getLogger(settings.DEFAULT_LOGGER).removeHandler(handler) def get_hardware(self, name): return self.hardware[name] @property def sensors(self): return {k: v for k, v in self.hardware.items() if isinstance(v, SensorDriver)} @property def effectors(self): return {k: v for k, v in self.hardware.items() if isinstance(v, EffectorDriver)} @property def enabled_controllers(self): return [c for exp in self.experiments.values() for c in exp.controllers if exp.enabled] @property def calibration_status(self): """Return the calibration Status for each device's calibrator, or None if hardware has no calibrator. This is explicit and returns all available hardware even those without calibrators. """ return { name: device.calibrator.status if getattr(device, "calibrator", None) else None for name, device in self.hardware.items() } @property def state(self): return {name: device.get() for name, device in self.sensors.items()} @property def schema(self): hardware_schemas = [] for n, hw in self.hardware.items(): s = {"name": n, "kind": str(type(hw)), "config": hw.Config.model_json_schema()} if isinstance(hw, SensorDriver): s["output"] = hw.Output.model_json_schema() if isinstance(hw, EffectorDriver): s["input"] = hw.Input.model_json_schema() hardware_schemas.append(s) return { "hardware": hardware_schemas, "controllers": [{"kind": str(type(a)), "config": a.Config.model_json_schema()} for a in self.controllers], } def _loop_exception_wrapper(self, callable, message="unknown") -> bool: try: callable() return None except Exception as exc: self.logger.exception(f"Error in loop: {message}") if self.raise_loop_exceptions: raise return exc def read_state(self): read_errors = [] for name, device in self.sensors.items(): read_errors.append(self._loop_exception_wrapper(device.read, f"reading device {name}")) self.last_read[name] = time.time() if data := device.get(): if isinstance(data, dict): for vial, output in data.items(): self.history.put(name, "sensor", output, vial=vial) else: self.history.put(name, "sensor", data, vial=getattr(data, "vial", None)) return read_errors def evaluate_controllers(self): return [self._loop_exception_wrapper(c.run, f"updating controller {c}") for c in self.enabled_controllers] def commit_proposals(self): return [ self._loop_exception_wrapper(d.commit, f"committing proposals for {d}") for d in self.effectors.values() ] def loop_once(self): self.logger.info("running control loop iteration") read_errors = self.read_state() if any(read_errors) and self.skip_control_on_read_failure: self.logger.warning("Skipping control loop due to read error") return if self.enable_control: control_errors = self.evaluate_controllers() if any(control_errors) and self.abort_on_control_errors: self.abort() raise RuntimeError("Aborted due to control error(s) - see logs for all errors") from control_errors[0] commit_errors = self.commit_proposals() if any(commit_errors) and self.abort_on_commit_errors: self.abort() raise RuntimeError("Aborted due to commit error(s) - see logs for all errors") from commit_errors[0] def abort(self): self.logger.log(EVENT, "Abort start") self.enable_control = False for device in self.effectors.values(): device.off() self.logger.log(EVENT, "Abort complete - device now inactive") def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.config_model.save(settings.SNAPSHOT) # TODO: Add code to stop all running hardware, which could implemented by context managing the hardware. ...