"Ray: A distributed system for emerging AI applications" by Stephanie Wang and Robert Nishihara (2017-2018)
Ray: a distributed fault tolerance system
Traditional solution: glue multiple systems together, or build everything from scratch.
Ray's solution: treat multiple existing systems as libraries and use Ray to glue them together
Now, since ray's libraries have different performance requirement and tolerance, it is not enough to use one approach:
Tasks (Function): allow every function to execute asynchronously
Actor (Classes): make classes as servies
Ray can assign functions as tasks to be executed on remote machines and resolve dependency between those functions. The underlying logics is based on computational graph.
However, computational graph (although important in ML) does not allow you to share mutable states between multiple tasks. (
id1
cannot communicate withid2
).Example of sharing mutable states: - you want to share weights of network - state of simulator - interaction with realworld sensor
So we need actor system.
Review: (refer to 15-440 Distributed ML for more info)
parameter server: a key value database storing model weights and other parameters (typically implemented as stand-alone system)
worker: fetch weights from parameter server, do computation, push to parameter server
import ray
ray.init() # assuming single machine
@ray.remote
class ParameterServer:
def __init__(self):
self.param = np.zeros(10)
def get_params(self):
return self.params
def update_params(self, grad):
self.params += grad
# starting parameter server
ps = ParameterServer.remote() # start a new process in cluster
future = ps.get_params.remote() # send request of param to paramater server, return future
params = ray.get(future) # blocking, returning output array
@ray.demote
def worker(ps):
# it does:
# 1. get weight from server
# 2. compute
# 3. push gradian back to server
for _ in range(100):
params = ray.get(ps.get_params.remote())
grad = np.ones(10) # compute, maybe using PyTorch, data
time.sleep(0.2) # it takes time
ps.update_params.remote(grad) # push to server
worker.remote(ps) # start a new worker
worker.remote(ps) # start another new worker
worker.remote(ps) # start another new worker
# above will start and execute in background,
# weights are updated in parameter server
Every Node has:
scheduler: scheduler thread that fetch works that has no dependency
worker: python worker threads waiting for tasks and execute
object store: one thread in charge of object store database. worker may store computed value there.
Special global control store Node: fault tolerance key-value store
holds systematic data
may store object location (which object is in which object store)
may store computational lineage of data (what tasks are required to create this specific object)
Here we give an example of executing the "Dot Task"
"Dot Task"
obj1
which is one of the dependency)obj2
, so it ask global control store where the data isobj3
from node 3, add that to node 2's object storeobj3
to object storeWhen there is fault, global control store computes what computation in the dependency graph is lost and reassign the task.
Course-grained task (run in seconds): global checkpoint or linearage reconstruction are both fine since we trade latency for fault tolerance
Fine-grain task (run in milliseconds): in ML, Ray targets stream processing, graph processing, interactive queries on mutable data, end-to-end RL. These applications generate large number of tasks per second that each run on the order of milliseconds. So we want absolutely no overhead (10 milliseconds = 10x overhead) of scheduling those tasks.
Traditionally: Before computing tasks, store tasks to be computed to global lineage storage. This will jam the global lineage storage due to large amount of subtasks.
Ray Propose:
Table of Content