概要

Ray 是一个开源的分布式计算框架,由加州大学伯克利分校 RISELab 开发。它的主要目标是让分布式计算变得简单和高效,特别适合 AI 和机器学习工作负载。

优点

  1. 简单易用:使用 Python API,学习曲线平缓,只需要少量代码修改就能将本地代码转换为分布式执行,支持函数级别的并行化。
  2. 灵活性:支持多种计算模式:任务并行、角色并行、数据并行,可以处理有状态和无状态的计算,适配多种应用场景:机器学习、强化学习、模型服务等。
  3. 高性能:低延迟任务调度,高效的对象存储系统,良好的可扩展性。自动管理 CPU、GPU 等计算资源。
  4. 容错机制:任务失败自动重试,Actor 故障恢复。

核心概念

  1. Task:通过 @ray.remote 装饰器将普通函数转换为分布式任务
  2. Actor:支持有状态的并行计算
  3. Object Store:分布式共享内存系统,用于数据共享

示例解析

分布式任务

func 函数转换为分布式任务,并通过 func.remote(args) 做未来的计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import ray

# 初始化 Ray
ray.init(num_cpus=2)

# 定义远程函数
@ray.remote
def add(x, y):
return x + y

# 提交任务,返回 object ref
future1 = add.remote(1, 2)
future2 = add.remote(3, 4)

# 获取结果
results = ray.get([future1, future2])
print(results) # [3, 7]
# 关闭 Ray
ray.shutdown()

基本逻辑:通过 @ray.remote 将函数 add 转换为分布式任务,通过 add.remote(1, 2) 提交任务,返回 object ref,通过 ray.get([future1, future2]) 等待计算完成并获取结果。

Actor 分布式对象

Actor 是一个有状态的工作进程,可以封装状态和方法。通过 @ray.remote 将一个类转换为分布式 Actor。

举个🌰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import ray
ray.init()

@ray.remote
class Counter:
def __init__(self):
self.value = 0

def increment(self):
self.value += 1
return self.value

def get_value(self):
return self.value

# 创建 Actor 实例
counter = Counter.remote()

# 调用 Actor 方法,返回 object ref
future1 = counter.increment.remote()
future2 = counter.increment.remote()
future3 = counter.get_value.remote()

# 获取结果
print(ray.get([future1, future2])) # [1, 2]
print(ray.get(future3)) # 2

ray.shutdown()

常见 Ray 操作

下边介绍 Ray 的一些常见操作:

ray.put()ray.get()

1
2
3
4
5
6
7
8
9
import ray
ray.init()

# put 将对象放入共享内存
data = [1, 2, 3, 4, 5]
data_ref = ray.put(data) # 返回 object ref

# get 从共享内存获取对象
result = ray.get(data_ref) # [1, 2, 3, 4, 5]

ray.wait() 等待操作完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time
@ray.remote
def slow_function(i):
time.sleep(i)
return i

# 提交任务
refs = [slow_function.remote(i) for i in range(4)]

# 等待部分完成,返回已完成和未完成的
ready_refs, remaining_refs = ray.wait(refs, num_returns=2)
print(len(ready_refs), len(remaining_refs)) # 2 2
# 等待所有完成
ready_refs, remaining_refs = ray.wait(refs, num_returns=len(refs))
print(len(ready_refs), len(remaining_refs)) # 4 0
print(ray.get(ready_refs)) # [0, 1, 2, 3]

ray.kill() 终止 Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@ray.remote
class Counter:
def __init__(self):
self.value = 0

def increment(self):
self.value += 1
return self.value

def get_value(self):
return self.value

# 创建 Actor 实例
counter = Counter.remote()
ray.kill(counter)
# 调用 Actor 方法
future = counter.increment.remote()
ray.get(future) # ActorDiedError

资源管理

1
2
3
4
5
6
7
8
9
# 指定 CPU 资源
@ray.remote(num_cpus=2)
def cpu_task():
return 1

# 指定 GPU 资源
@ray.remote(num_gpus=1)
def gpu_task():
return 1

也可以通过 Options 动态配置

1
2
3
4
5
6
@ray.remote
def task():
return 1

# 动态设置资源需求
future = task.options(num_cpus=2).remote()

异常处理

1
2
3
4
5
6
7
8
9
10
@ray.remote
def might_fail(x):
if x < 0:
raise ValueError("Negative value")
return x

try:
result = ray.get(might_fail.remote(-1))
except ray.exceptions.RayTaskError as e:
print("Task failed:", e)

重试机制

1
2
3
4
@ray.remote(max_retries=3)
def unstable_function():
# 可能失败的操作
pass

其他遇到需求再补充学习。

Actor 示例学习

实践场景中,我们更多需要处理有状态的任务并行。

序列化

看一些稍微复杂的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import ray
import random

# 定义远程 Actor 类
@ray.remote
class Foo:
def __init__(self):
self.val = Bar() # 每个 Actor 实例都存放 Bar 实例

def get_val(self):
return self.val

def get_id(self):
return id(self.val)

# 普通类,作为 Actor 的属性
class Bar:
pass

# 创建多个 Actor 实例
workers = [Foo.remote() for _ in range(4)]

# 并行获取所有 worker 的值
futures = [w.get_val.remote() for w in workers]
bars = ray.get(futures)
futures = [w.get_id.remote() for w in workers]
ids = ray.get(futures)
for i,b in zip(ids, bars):
print(i, id(b)) # 不同的 id

remote 拿回数据会经过序列化和反序列化,是一个拷贝的对象。

场景的类对象是可以被序列化,参看:what-can-be-pickled-and-unpickled

一些对象不能被序列化,比如多线程的锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import ray
import threading

# 一个包含线程锁的类
class UnserializableClass:
def __init__(self):
self.lock = threading.Lock() # 线程锁对象不能序列化
self.value = 0

@ray.remote
class MyActor:
def __init__(self):
# 这会失败,因为 Lock 对象不能序列化
self.unserializable = UnserializableClass()

def get_value(self):
return self.unserializable

# 尝试创建 Actor 实例
try:
actor = MyActor.remote()
result = ray.get(actor.get_value.remote())
except Exception as e:
print(f"Error: {e}")

如果自定义的类不支持序列化,可以使用 __getstate____setstate__ 方法来控制序列化和反序列化的行为。

获取属性

通常,Ray Actor 不能直接访问属性,理论上可以通过 getter/setter 来访问和修改属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import ray

@ray.remote
class Counter:
def __init__(self):
self.value1 = 0
self.value2 = 100

def get_value(self, key):
return getattr(self, key)

def update(self, key, value):
setattr(self, key, value)


# 使用示例
counter = Counter.remote()
# 获取属性
value1 = ray.get(counter.get_value.remote("value1"))
value2 = ray.get(counter.get_value.remote("value2"))
print(value1, value2) # 0 100
# 更新属性
counter.update.remote("value1", 10)
counter.update.remote("value2", 20)
# 获取更新后的属性
value1 = ray.get(counter.get_value.remote("value1"))
value2 = ray.get(counter.get_value.remote("value2"))
print(value1, value2) # 10 20

不过,这不是最佳实践。就像前边说的,这个过程返回属性的副本而不是引用,且存在增加通讯开销。非必要的情况下,要调用该属性的函数最好也封装在 Actor 中。

上下文信息

remote 的对象是重头创建的,有个容易犯错的地方:上下文信息可能会丢失。

举个例子,假设 constants.py 有一个常量 num = 1,我们在定义函数时做了修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import ray
import constants
constants.num = 2 # 修改常量
num = constants.num

@ray.remote # 直接传输整个
def get_num():
return constants.num

@ray.remote # 直接传输变量
def get_num_direct():
return num

print(ray.get(get_num.remote())) # 1
print(ray.get(get_num_direct.remote())) # 2

可以发现,前者重新执行了 constants.num 继而返回 1

Ray 生态系统

Ray 提供了丰富的库和工具:

  • Ray Train:分布式模型训练
  • Ray Tune:分布式超参数调优
  • Ray Serve:模型服务部署
  • RLlib:强化学习库
  • Ray Data:分布式数据处理

Ray Core 是整个框架的核心,提供了基础的分布式计算能力:

Tune 是专门用于模型超参数优化的工具:

  • 多种优化算法: 支持网格搜索、随机搜索、贝叶斯优化等
  • 早停策略: 自动停止表现差的试验
  • 资源调度: 智能分配计算资源
  • 与主流框架集成: 支持 PyTorch、TensorFlow、XGBoost 等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from ray import tune

def train_model(config):
# 模型训练代码
for i in range(10):
tune.report(loss=1/i)

analysis = tune.run(
train_model,
config={
"lr": tune.loguniform(1e-4, 1e-1),
"batch_size": tune.choice([16, 32, 64])
}
)

RLlib 是一个高度可扩展的强化学习库:

  • 算法支持: 实现了主流的 RL 算法(PPO、DQN、DDPG 等)
  • 环境兼容: 支持 gym、Unity、自定义环境
  • 分布式训练: 自动进行并行采样和训练
  • 高度定制: 支持自定义策略网络和环境
1
2
3
4
5
6
7
8
9
10
11
12
13
from ray.rllib.agents.ppo import PPOTrainer

config = {
"env": "CartPole-v1",
"num_workers": 4
}

trainer = PPOTrainer(config=config)

# 训练模型
for i in range(10):
result = trainer.train()
print(f"Iteration {i}: reward = {result['episode_reward_mean']}")

三者之间的关系

  • Ray Core 是基础设施,提供分布式计算能力
  • Ray Tune 基于 Core 构建,用于自动化模型调优
  • RLlib 也基于 Core 构建,专注于强化学习场景

典型应用场景:

  1. 模型开发: 使用 RLlib 开发强化学习模型
  2. 参数优化: 使用 Tune 优化模型超参数
  3. 规模化部署: 使用 Ray Core 进行分布式训练和服务

这三个组件形成了一个完整的机器学习工具链,能够覆盖从实验到生产的全流程。它们既可以单独使用,也可以组合使用,非常灵活。