"Ray: A distributed system for emerging AI applications" by Stephanie Wang and Robert Nishihara (2017-2018)
Ray: a distributed fault tolerance system
Traditional solution: glue multiple systems together, or build everything from scratch.
Ray's solution: treat multiple existing systems as libraries and use Ray to glue them together
Now, since ray's libraries have different performance requirement and tolerance, it is not enough to use one approach:
Tasks (Function): allow every function to execute asynchronously
Actor (Classes): make classes as servies
Ray can assign functions as tasks to be executed on remote machines and resolve dependency between those functions. The underlying logics is based on computational graph.
However, computational graph (although important in ML) does not allow you to share mutable states between multiple tasks. (
id1cannot communicate with
Example of sharing mutable states: - you want to share weights of network - state of simulator - interaction with realworld sensor
So we need actor system.
Review: (refer to 15-440 Distributed ML for more info)
parameter server: a key value database storing model weights and other parameters (typically implemented as stand-alone system)
worker: fetch weights from parameter server, do computation, push to parameter server
import ray ray.init() # assuming single machine @ray.remote class ParameterServer: def __init__(self): self.param = np.zeros(10) def get_params(self): return self.params def update_params(self, grad): self.params += grad # starting parameter server ps = ParameterServer.remote() # start a new process in cluster future = ps.get_params.remote() # send request of param to paramater server, return future params = ray.get(future) # blocking, returning output array @ray.demote def worker(ps): # it does: # 1. get weight from server # 2. compute # 3. push gradian back to server for _ in range(100): params = ray.get(ps.get_params.remote()) grad = np.ones(10) # compute, maybe using PyTorch, data time.sleep(0.2) # it takes time ps.update_params.remote(grad) # push to server worker.remote(ps) # start a new worker worker.remote(ps) # start another new worker worker.remote(ps) # start another new worker # above will start and execute in background, # weights are updated in parameter server
Every Node has:
scheduler: scheduler thread that fetch works that has no dependency
worker: python worker threads waiting for tasks and execute
object store: one thread in charge of object store database. worker may store computed value there.
Special global control store Node: fault tolerance key-value store
holds systematic data
may store object location (which object is in which object store)
may store computational lineage of data (what tasks are required to create this specific object)
Here we give an example of executing the "Dot Task"
obj1which is one of the dependency)
obj2, so it ask global control store where the data is
obj3from node 3, add that to node 2's object store
obj3to object store
When there is fault, global control store computes what computation in the dependency graph is lost and reassign the task.
Course-grained task (run in seconds): global checkpoint or linearage reconstruction are both fine since we trade latency for fault tolerance
Fine-grain task (run in milliseconds): in ML, Ray targets stream processing, graph processing, interactive queries on mutable data, end-to-end RL. These applications generate large number of tasks per second that each run on the order of milliseconds. So we want absolutely no overhead (10 milliseconds = 10x overhead) of scheduling those tasks.
Traditionally: Before computing tasks, store tasks to be computed to global lineage storage. This will jam the global lineage storage due to large amount of subtasks.
A Ray job is a single application: it is the collection of Ray tasks, objects, and actors that originate from the same script.
Once you have cluster running, there are ways to submit a task:
Run the driver script directly on any node of the Ray cluster, for interactive development.
(For Experts only) Use Ray Client to connect remotely to the cluster within a driver script.
submit entrypoint command and runtime environment (dependencies) to run a job
a job will run to completion or failure (regardless submitter's connectivity)
job will be terminated once cluster is terminated (no fault tolerance)
If you run script directly on cluster node, or use Ray Client, it doesn't involve Ray Jobs API and
ray job listcannot see it.
Ray Client is for remote development, you do
ray.init("ray://<head_node_host>:10001") to connect to a Ray Client server (this server will start by default when Ray Cluster starts). The jobs will be dropped if connection is lost.
Build files and dependencies into a container image and specify the image in Cluster YAML Configuration.
Don't use runtime environments,
ray rsync_upfor production.
Table of Content