"Ray: A distributed system for emerging AI applications" by Stephanie Wang and Robert Nishihara (2017-2018)
Ray: a distributed fault tolerance system
Traditional solution: glue multiple systems together, or build everything from scratch.
Ray's solution: treat multiple existing systems as libraries and use Ray to glue them together
Now, since ray's libraries have different performance requirement and tolerance, it is not enough to use one approach:
Tasks (Function): allow every function to execute asynchronously
Actor (Classes): make classes as servies
Ray can assign functions as tasks to be executed on remote machines and resolve dependency between those functions. The underlying logics is based on computational graph.
However, computational graph (although important in ML) does not allow you to share mutable states between multiple tasks. (
id1
cannot communicate withid2
).Example of sharing mutable states: - you want to share weights of network - state of simulator - interaction with realworld sensor
So we need actor system.
Review: (refer to 15-440 Distributed ML for more info)
parameter server: a key value database storing model weights and other parameters (typically implemented as stand-alone system)
worker: fetch weights from parameter server, do computation, push to parameter server
import ray
ray.init() # assuming single machine
@ray.remote
class ParameterServer:
def __init__(self):
self.param = np.zeros(10)
def get_params(self):
return self.params
def update_params(self, grad):
self.params += grad
# starting parameter server
ps = ParameterServer.remote() # start a new process in cluster
future = ps.get_params.remote() # send request of param to paramater server, return future
params = ray.get(future) # blocking, returning output array
@ray.demote
def worker(ps):
# it does:
# 1. get weight from server
# 2. compute
# 3. push gradian back to server
for _ in range(100):
params = ray.get(ps.get_params.remote())
grad = np.ones(10) # compute, maybe using PyTorch, data
time.sleep(0.2) # it takes time
ps.update_params.remote(grad) # push to server
worker.remote(ps) # start a new worker
worker.remote(ps) # start another new worker
worker.remote(ps) # start another new worker
# above will start and execute in background,
# weights are updated in parameter server
Every Node has:
scheduler: scheduler thread that fetch works that has no dependency
worker: python worker threads waiting for tasks and execute
object store: one thread in charge of object store database. worker may store computed value there.
Special global control store Node: fault tolerance key-value store
holds systematic data
may store object location (which object is in which object store)
may store computational lineage of data (what tasks are required to create this specific object)
Here we give an example of executing the "Dot Task"
"Dot Task"
obj1
which is one of the dependency)obj2
, so it ask global control store where the data isobj3
from node 3, add that to node 2's object storeobj3
to object storeWhen there is fault, global control store computes what computation in the dependency graph is lost and reassign the task.
Course-grained task (run in seconds): global checkpoint or linearage reconstruction are both fine since we trade latency for fault tolerance
Fine-grain task (run in milliseconds): in ML, Ray targets stream processing, graph processing, interactive queries on mutable data, end-to-end RL. These applications generate large number of tasks per second that each run on the order of milliseconds. So we want absolutely no overhead (10 milliseconds = 10x overhead) of scheduling those tasks.
Traditionally: Before computing tasks, store tasks to be computed to global lineage storage. This will jam the global lineage storage due to large amount of subtasks.
Ray Propose:
A Ray job is a single application: it is the collection of Ray tasks, objects, and actors that originate from the same script.
Once you have cluster running, there are ways to submit a task:
(Recommended) Submit the job using the Ray Jobs API. It has a CLI tool, Python SDK, and a REST API (and its specification).
Run the driver script directly on any node of the Ray cluster, for interactive development.
(For Experts only) Use Ray Client to connect remotely to the cluster within a driver script.
Ray Jobs:
submit entrypoint command and runtime environment (dependencies) to run a job
a job will run to completion or failure (regardless submitter's connectivity)
job will be terminated once cluster is terminated (no fault tolerance)
If you run script directly on cluster node, or use Ray Client, it doesn't involve Ray Jobs API and
ray job list
cannot see it.
Ray Client is for remote development, you do ray.init("ray://<head_node_host>:10001")
to connect to a Ray Client server (this server will start by default when Ray Cluster starts). The jobs will be dropped if connection is lost.
Build files and dependencies into a container image and specify the image in Cluster YAML Configuration.
Don't use runtime environments,
setup_commands
, orray rsync_up
for production.
Table of Content