概要
Ray 是一个开源的分布式计算框架,由加州大学伯克利分校 RISELab 开发。它的主要目标是让分布式计算变得简单和高效,特别适合 AI 和机器学习工作负载。
优点 :
简单易用 :使用 Python API,学习曲线平缓,只需要少量代码修改就能将本地代码转换为分布式执行,支持函数级别的并行化。
灵活性 :支持多种计算模式:任务并行、角色并行、数据并行,可以处理有状态和无状态的计算,适配多种应用场景:机器学习、强化学习、模型服务等。
高性能 :低延迟任务调度,高效的对象存储系统,良好的可扩展性。自动管理 CPU、GPU 等计算资源。
容错机制 :任务失败自动重试,Actor 故障恢复。
核心概念 :
Task :通过 @ray.remote
装饰器将普通函数转换为分布式任务
Actor :支持有状态的并行计算
Object Store :分布式共享内存系统,用于数据共享
示例解析
分布式任务
将 func
函数转换为分布式任务,并通过 func.remote(args)
做未来的计算。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import rayray.init(num_cpus=2 ) @ray.remote def add (x, y ): return x + y future1 = add.remote(1 , 2 ) future2 = add.remote(3 , 4 ) results = ray.get([future1, future2]) print (results) ray.shutdown()
基本逻辑:通过 @ray.remote
将函数 add
转换为分布式任务,通过 add.remote(1, 2)
提交任务,返回 object ref,通过 ray.get([future1, future2])
等待计算完成并获取结果。
Actor 分布式对象
Actor 是一个有状态的工作进程,可以封装状态和方法。通过 @ray.remote
将一个类转换为分布式 Actor。
举个🌰:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import rayray.init() @ray.remote class Counter : def __init__ (self ): self.value = 0 def increment (self ): self.value += 1 return self.value def get_value (self ): return self.value counter = Counter.remote() future1 = counter.increment.remote() future2 = counter.increment.remote() future3 = counter.get_value.remote() print (ray.get([future1, future2])) print (ray.get(future3)) ray.shutdown()
常见 Ray 操作
下边介绍 Ray 的一些常见操作:
ray.put()
和 ray.get()
1 2 3 4 5 6 7 8 9 import rayray.init() data = [1 , 2 , 3 , 4 , 5 ] data_ref = ray.put(data) result = ray.get(data_ref)
ray.wait()
等待操作完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import time@ray.remote def slow_function (i ): time.sleep(i) return i refs = [slow_function.remote(i) for i in range (4 )] ready_refs, remaining_refs = ray.wait(refs, num_returns=2 ) print (len (ready_refs), len (remaining_refs)) ready_refs, remaining_refs = ray.wait(refs, num_returns=len (refs)) print (len (ready_refs), len (remaining_refs)) print (ray.get(ready_refs))
ray.kill()
终止 Actor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @ray.remote class Counter : def __init__ (self ): self.value = 0 def increment (self ): self.value += 1 return self.value def get_value (self ): return self.value counter = Counter.remote() ray.kill(counter) future = counter.increment.remote() ray.get(future)
资源管理
1 2 3 4 5 6 7 8 9 @ray.remote(num_cpus=2 ) def cpu_task (): return 1 @ray.remote(num_gpus=1 ) def gpu_task (): return 1
也可以通过 Options 动态配置
1 2 3 4 5 6 @ray.remote def task (): return 1 future = task.options(num_cpus=2 ).remote()
异常处理
1 2 3 4 5 6 7 8 9 10 @ray.remote def might_fail (x ): if x < 0 : raise ValueError("Negative value" ) return x try : result = ray.get(might_fail.remote(-1 )) except ray.exceptions.RayTaskError as e: print ("Task failed:" , e)
重试机制
1 2 3 4 @ray.remote(max_retries=3 ) def unstable_function (): pass
其他遇到需求再补充学习。
Actor 示例学习
实践场景中,我们更多需要处理有状态的任务并行。
序列化
看一些稍微复杂的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import rayimport random@ray.remote class Foo : def __init__ (self ): self.val = Bar() def get_val (self ): return self.val def get_id (self ): return id (self.val) class Bar : pass workers = [Foo.remote() for _ in range (4 )] futures = [w.get_val.remote() for w in workers] bars = ray.get(futures) futures = [w.get_id.remote() for w in workers] ids = ray.get(futures) for i,b in zip (ids, bars): print (i, id (b))
从 remote
拿回数据会经过序列化和反序列化,是一个拷贝的对象。
场景的类对象是可以被序列化,参看:what-can-be-pickled-and-unpickled 。
一些对象不能被序列化,比如多线程的锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import rayimport threadingclass UnserializableClass : def __init__ (self ): self.lock = threading.Lock() self.value = 0 @ray.remote class MyActor : def __init__ (self ): self.unserializable = UnserializableClass() def get_value (self ): return self.unserializable try : actor = MyActor.remote() result = ray.get(actor.get_value.remote()) except Exception as e: print (f"Error: {e} " )
如果自定义的类不支持序列化,可以使用 __getstate__
和 __setstate__
方法来控制序列化和反序列化的行为。
获取属性
通常,Ray Actor 不能直接访问属性,理论上可以通过 getter/setter
来访问和修改属性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import ray@ray.remote class Counter : def __init__ (self ): self.value1 = 0 self.value2 = 100 def get_value (self, key ): return getattr (self, key) def update (self, key, value ): setattr (self, key, value) counter = Counter.remote() value1 = ray.get(counter.get_value.remote("value1" )) value2 = ray.get(counter.get_value.remote("value2" )) print (value1, value2) counter.update.remote("value1" , 10 ) counter.update.remote("value2" , 20 ) value1 = ray.get(counter.get_value.remote("value1" )) value2 = ray.get(counter.get_value.remote("value2" )) print (value1, value2)
不过,这不是最佳实践。就像前边说的,这个过程返回属性的副本而不是引用,且存在增加通讯开销。非必要的情况下,要调用该属性的函数最好也封装在 Actor 中。
上下文信息
remote 的对象是重头创建的,有个容易犯错的地方:上下文信息可能会丢失。
举个例子,假设 constants.py
有一个常量 num = 1
,我们在定义函数时做了修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import rayimport constantsconstants.num = 2 num = constants.num @ray.remote def get_num (): return constants.num @ray.remote def get_num_direct (): return num print (ray.get(get_num.remote())) print (ray.get(get_num_direct.remote()))
可以发现,前者重新执行了 constants.num
继而返回 1
。
Ray 生态系统
Ray 提供了丰富的库和工具:
Ray Train :分布式模型训练
Ray Tune :分布式超参数调优
Ray Serve :模型服务部署
RLlib :强化学习库
Ray Data :分布式数据处理
Ray Core 是整个框架的核心,提供了基础的分布式计算能力:
Tune 是专门用于模型超参数优化的工具:
多种优化算法 : 支持网格搜索、随机搜索、贝叶斯优化等
早停策略 : 自动停止表现差的试验
资源调度 : 智能分配计算资源
与主流框架集成 : 支持 PyTorch、TensorFlow、XGBoost 等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from ray import tunedef train_model (config ): for i in range (10 ): tune.report(loss=1 /i) analysis = tune.run( train_model, config={ "lr" : tune.loguniform(1e-4 , 1e-1 ), "batch_size" : tune.choice([16 , 32 , 64 ]) } )
RLlib 是一个高度可扩展的强化学习库:
算法支持 : 实现了主流的 RL 算法(PPO、DQN、DDPG 等)
环境兼容 : 支持 gym、Unity、自定义环境
分布式训练 : 自动进行并行采样和训练
高度定制 : 支持自定义策略网络和环境
1 2 3 4 5 6 7 8 9 10 11 12 13 from ray.rllib.agents.ppo import PPOTrainerconfig = { "env" : "CartPole-v1" , "num_workers" : 4 } trainer = PPOTrainer(config=config) for i in range (10 ): result = trainer.train() print (f"Iteration {i} : reward = {result['episode_reward_mean' ]} " )
三者之间的关系
Ray Core 是基础设施,提供分布式计算能力
Ray Tune 基于 Core 构建,用于自动化模型调优
RLlib 也基于 Core 构建,专注于强化学习场景
典型应用场景:
模型开发 : 使用 RLlib 开发强化学习模型
参数优化 : 使用 Tune 优化模型超参数
规模化部署 : 使用 Ray Core 进行分布式训练和服务
这三个组件形成了一个完整的机器学习工具链,能够覆盖从实验到生产的全流程。它们既可以单独使用,也可以组合使用,非常灵活。