Ray

"Ray: A distributed system for emerging AI applications" by Stephanie Wang and Robert Nishihara (2017-2018)

Ray: a distributed fault tolerance system

Problem: As of 2018, people use different distributed system for different tasks related to ML, which is hard for establish coupling between different components (e.g. training, data collection, serving).

Problem: As of 2018, people use different distributed system for different tasks related to ML, which is hard for establish coupling between different components (e.g. training, data collection, serving).

Traditional solution: glue multiple systems together, or build everything from scratch.

Ray's solution: treat multiple existing systems as libraries and use Ray to glue them together

Ray's Architecture

Ray's Architecture

Now, since ray's libraries have different performance requirement and tolerance, it is not enough to use one approach:

Ray's Methods

Ray's Methods

Tasks and Actors

Tasks

Tasks

Ray can assign functions as tasks to be executed on remote machines and resolve dependency between those functions. The underlying logics is based on computational graph.

However, computational graph (although important in ML) does not allow you to share mutable states between multiple tasks. (id1 cannot communicate with id2).

Example of sharing mutable states: - you want to share weights of network - state of simulator - interaction with realworld sensor

So we need actor system.

Actor + Tasks System

Actor + Tasks System

Review: Parameter Server

Review: Parameter Server

Review: (refer to 15-440 Distributed ML for more info)

Using Ray

import ray

ray.init() # assuming single machine

@ray.remote
class ParameterServer:
  def __init__(self):
    self.param = np.zeros(10)
  def get_params(self):
    return self.params
  def update_params(self, grad):
    self.params += grad

# starting parameter server
ps = ParameterServer.remote() # start a new process in cluster
future = ps.get_params.remote() # send request of param to paramater server, return future
params = ray.get(future) # blocking, returning output array

@ray.demote
def worker(ps):
  # it does:
  # 1. get weight from server
  # 2. compute
  # 3. push gradian back to server
  for _ in range(100):
    params = ray.get(ps.get_params.remote())
    grad = np.ones(10) # compute, maybe using PyTorch, data
    time.sleep(0.2) # it takes time
    ps.update_params.remote(grad) # push to server

worker.remote(ps) # start a new worker
worker.remote(ps) # start another new worker
worker.remote(ps) # start another new worker

# above will start and execute in background,
# weights are updated in parameter server

Ray's Fault-Tolerance

Every Node has:

Special global control store Node: fault tolerance key-value store

Nodes in Ray's Fault-Tolerance Architecture

Nodes in Ray's Fault-Tolerance Architecture

Here we give an example of executing the "Dot Task"

"Dot Task" Example

"Dot Task" Example

"Dot Task"

  1. Driver create the task, submit it to scheduler
  2. Scheduler logs the task to global control store (for fault tolerance)
  3. Scheduler decides to forward the task to other scheduler (maybe for data locality purpose since node 2 has obj1 which is one of the dependency)
  4. But node 2 does not have obj2, so it ask global control store where the data is
  5. node 2 request obj3 from node 3, add that to node 2's object store
  6. scheduler assign task to worker
  7. worker finish computation, return data obj3 to object store

When there is fault, global control store computes what computation in the dependency graph is lost and reassign the task.

Course-grained task (run in seconds): global checkpoint or linearage reconstruction are both fine since we trade latency for fault tolerance

Fine-grain task (run in milliseconds): in ML, Ray targets stream processing, graph processing, interactive queries on mutable data, end-to-end RL. These applications generate large number of tasks per second that each run on the order of milliseconds. So we want absolutely no overhead (10 milliseconds = 10x overhead) of scheduling those tasks.

Ray's Linage Store

Traditionally: Before computing tasks, store tasks to be computed to global lineage storage. This will jam the global lineage storage due to large amount of subtasks.

Ray Propose:

  1. before writing to global storage, node A write linage to its local in-memory stash
  2. node A forward task to node B, along with uncommitted linage
  3. node B, upon receiving, add linage to its own stash and start computing
  4. node A commit linage in the background to global lineage store to keep the size of stash small

Experimental Result: Fault Tolerance for Free

Experimental Result: Fault Tolerance for Free

Ray Cluster

Overview of Ray Cluster. Ray nodes are implemented as pods when running on Kubernetes.

Overview of Ray Cluster. Ray nodes are implemented as pods when running on Kubernetes.

Ray Jobs

A Ray job is a single application: it is the collection of Ray tasks, objects, and actors that originate from the same script.

Once you have cluster running, there are ways to submit a task:

Ray Jobs:

Ray Client

If you run script directly on cluster node, or use Ray Client, it doesn't involve Ray Jobs API and ray job list cannot see it.

Ray Client is for remote development, you do ray.init("ray://<head_node_host>:10001") to connect to a Ray Client server (this server will start by default when Ray Cluster starts). The jobs will be dropped if connection is lost.

Ray Environment

Build files and dependencies into a container image and specify the image in Cluster YAML Configuration.

Don't use runtime environments, setup_commands, or ray rsync_up for production.

Table of Content