Source code for dplutils.observer.ray
import ray
from ray.util.metrics import Counter, Gauge
from dplutils.observer import Observer
[docs]
class RayActorWrappedObserver(Observer):
"""Create actor from another observer
For use with stateful observers that should not have multiple
instances. Each call to this observer does a remote call to the actor-based
observer.
Args:
cls: The observer class to start as actor
*args: Args to pass to ``cls`` instantiation
**kwargs: Keyword args to pass to ``cls`` instantiation
"""
[docs]
def __init__(self, cls, *args, **kwargs):
self.actor = ray.remote(cls).remote(*args, **kwargs)
self._wait = False # for testing purposes. If true wait instead of fire-and-forget
def _wrap_waiter(self, remote):
if self._wait:
ray.get(remote)
def observe(self, name, value, **kwargs):
self._wrap_waiter(self.actor.observe.remote(name, value, **kwargs))
def increment(self, name, value=1, **kwargs):
self._wrap_waiter(self.actor.increment.remote(name, value=1, **kwargs))
def param(self, name, value, **kwargs):
self._wrap_waiter(self.actor.param.remote(name, value, **kwargs))
[docs]
class RayMetricsObserver(Observer):
"""Observer implemented using ray metrics
Ray metrics are implemented in the raylet and exposed as a prometheus
endpoint. While there is some state to store the underlying ray metric
objects, this can be used directly having copies per worker (so does not
need to be wrapped in actor).
"""
[docs]
def __init__(self):
self.mmap = {}
def _get_or_set_as(self, name, kind):
if name in self.mmap:
metric = self.mmap[name]
if not isinstance(metric, kind):
raise TypeError(f"setting metric requires {kind}, but {name} is {type(metric)}")
else:
metric = kind(name)
self.mmap[name] = metric
return metric
def observe(self, name, value, **kwargs):
self._get_or_set_as(name, Gauge).set(value)
def increment(self, name, value=1, **kwargs):
self._get_or_set_as(name, Counter).inc(value)
def param(self, name, value, **kwargs):
"""Not yet implemented"""
pass