写在前边

本篇介绍 Zulip API 的使用方法:如何获取 Zulip 中的聊天数据用于集成到 AI 对话等场景。

在开始教程前,我想先做一些重要的声明和倡议。

  1. 隐私保护:在将论坛数据用于训练或其他用途时,请务必保护用户隐私,遵守相关法律法规
  2. 技术中立:技术本身是中立的,请不要滥用这些技术手段
  3. 合理使用:请确保你的使用符合目标 Zulip 实例的使用条款和政策

话说回来,好的方面是现在有很多辅助阅读的文档工具。通过 API 来实现 Zulip 内容的智能阅读和总结,确实是一个不错的选择。

什么是 Zulip

Zulip 是一款强大的、开源的团队沟通工具。与许多其他聊天应用不同,Zulip 的核心是基于话题(Topic)的线程化对话模型

Zulip 的独特之处

  • Stream + Topic 结构:每个对话都清晰地组织在特定的"流"(Stream,类似于频道)下的"话题"中
  • 信息组织:极大地减少了信息噪音,使得追踪多个并行对话变得异常轻松
  • 异步友好:即使你离开了一段时间,也能快速跟上进度
  • 功能丰富:强大的搜索、代码块高亮、Markdown 支持、表情回应等

Zulip API 概述

Zulip 提供了功能丰富的 REST API,支持几乎所有的操作。通过 Zulip API,你可以:

  1. 集成外部服务:将来自 GitHub、Jira、监控系统等的通知推送到指定的流和话题
  2. 构建交互式机器人:创建能够响应消息、执行任务的智能 Bot
  3. 自动化工作流程:自动发送报告、提醒,或触发其他系统操作
  4. 数据分析:获取聊天数据进行分析、总结,或与 AI 系统集成

我们本篇主要关注的是 Zulip 信息的获取:从频道信息、主题信息,到具体的对话内容。这样可以通过 AI 快速阅读总结相关帖子,阅读过程中也可以随时和 AI 交流。

准备工作

1. 获取 API 凭证

首先,你需要在 Zulip 中创建一个 Bot 账户:

  1. 登录你的 Zulip 实例
  2. 进入 SettingsYour bots
  3. 点击 Add a new bot
  4. 选择 Generic bot,填写必要信息
  5. 创建后会得到 Bot 的邮箱地址和 API Key

2. 安装依赖

1
pip install aiohttp asyncio

实现 Zulip API 客户端

下面是一个完整的异步 Zulip API 客户端实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import asyncio
import aiohttp
import json
import logging
from typing import List, Dict, Optional, Any
import os

ZULIP_BOT_EMAIL = os.getenv('ZULIP_BOT_EMAIL')
ZULIP_BOT_API_KEY = os.getenv('ZULIP_BOT_API_KEY')
ZULIP_SITE = os.getenv('ZULIP_SITE')

class ZulipClient:
def __init__(self,
site_url: Optional[str]=None,
bot_email: Optional[str]=None,
bot_api_key: Optional[str]=None,
logger: Optional[logging.Logger] = None):
"""初始化 Zulip 客户端"""
if site_url is None:
site_url = ZULIP_SITE
if bot_email is None:
bot_email = ZULIP_BOT_EMAIL
if bot_api_key is None:
bot_api_key = ZULIP_BOT_API_KEY

if not all([site_url, bot_email, bot_api_key]):
raise ValueError("site_url, bot_email, 和 bot_api_key 不能为空")

self.base_url = site_url.rstrip('/') + "/api/v1"
self._auth = aiohttp.BasicAuth(login=bot_email, password=bot_api_key)
self.logger = logger or logging.getLogger(self.__class__.__name__)
self._session: Optional[aiohttp.ClientSession] = None
self._closed = False # 添加状态标记

async def __aenter__(self):
"""进入异步上下文,创建并存储 aiohttp session"""
if self._closed:
raise RuntimeError("Client has been closed and cannot be reused")

if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
self.logger.info("Created new aiohttp session")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""退出异步上下文,关闭 aiohttp session"""
await self.close()

async def close(self):
"""显式关闭客户端"""
if self._session and not self._session.closed:
await self._session.close()
self.logger.info("Closed aiohttp session")
self._session = None
self._closed = True

def shutdown(self):
"""同步方法关闭客户端"""
if not self._closed:
self._run_async(self.close())

async def _ensure_session(self):
"""确保 session 可用"""
if self._closed:
raise RuntimeError("Client has been closed and cannot be reused")

if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
self.logger.info("Created new aiohttp session")
return self._session

async def _request(self, method: str, endpoint: str,
params: Optional[Dict] = None,
data: Optional[Dict] = None) -> Optional[Dict]:
"""基础请求方法"""
if self._closed:
raise RuntimeError("Client has been closed and cannot be reused")

session = await self._ensure_session()
url = f"{self.base_url}{endpoint}"

try:
async with session.request(method, url, params=params,
data=data, auth=self._auth) as resp:
if resp.status == 200:
return await resp.json()
self.logger.warning(f"请求失败: {resp.status} - {url}")
return None
except Exception as e:
self.logger.error(f"请求异常: {e}")
return None

def __del__(self):
"""析构函数"""
if not self._closed:
self.shutdown()

def _run_async(self, coro):
"""运行异步代码的同步包装器"""
try:
asyncio.get_running_loop()
raise RuntimeError("不能在异步环境中调用同步方法")
except RuntimeError:
return asyncio.run(coro)

async def _get_messages_batch(self, narrow: List[Dict],
anchor: str,
batch_size: int = 100) -> Optional[Dict]:
"""获取一批消息的通用方法"""
params = {
"narrow": json.dumps(narrow),
"anchor": anchor,
"num_before": batch_size,
"num_after": 0
}
return await self._request("GET", "/messages", params=params)

# === 异步公共接口 ===
async def get_channels_async(self) -> List[Dict]:
"""获取频道列表"""
data = await self._request("GET", "/users/me/subscriptions")
return data.get("subscriptions", []) if data else []

async def get_topics_async(self, stream_id: int) -> List[Dict]:
"""获取话题列表"""
data = await self._request("GET", f"/users/me/{stream_id}/topics")
return data.get("topics", []) if data else []

async def get_topic_history_async(self, stream_id: int, topic_name: str,
batch_size: int = 100, latest:bool=True) -> List[Dict[str, Any]]:
"""
获取指定频道和话题的完整消息历史记录。

Args:
stream_id: 频道ID
topic_name: 话题名称
batch_size: 每次请求获取的消息数量,建议不超过1000

Returns:
List[Dict]: 按时间顺序(从旧到新)排列的所有消息列表
"""
self.logger.info(f"开始获取话题 '{topic_name}' (频道 {stream_id}) 的完整历史...")

# 构建查询条件
narrow = [
{"operator": "stream", "operand": stream_id},
{"operator": "topic", "operand": topic_name}
]

all_messages = []
anchor = "newest"
request_count = 0

while True:
request_count += 1
self.logger.debug(f"第 {request_count} 次请求: anchor='{anchor}'")

# 使用基础请求方法获取数据
params = {
"narrow": json.dumps(narrow),
"anchor": anchor,
"num_before": batch_size,
"num_after": 0,
"apply_markdown": 'false'
}

data = await self._request("GET", "/messages", params=params)

if not data or "result" not in data:
self.logger.error(f"获取消息批次 #{request_count} 失败")
return []

messages = data.get("messages", [])
if not messages:
self.logger.debug("收到空消息批次,假定已达到最早消息")
break

# 将新消息添加到列表前面
all_messages = messages + all_messages

# 检查是否到达最早消息
if data.get("found_oldest", False):
self.logger.debug("已确认达到最早消息")
break

# 更新锚点为当前批次最早消息的ID
anchor = str(messages[0]["id"])

# 添加请求延迟
await asyncio.sleep(0.5)

# 安全检查:限制最大请求次数
if request_count >= 1000:
self.logger.warning("达到最大请求次数限制,返回可能不完整的结果")
break

self.logger.info(f"已完成话题历史获取:共 {len(all_messages)} 条消息,用了 {request_count} 次请求")

# 按时间顺序排序
messages = sorted(all_messages, key=lambda x: x["id"])
if latest:
msgs, msg_ids = [], set()
for msg in messages:
msg_id = msg['id']
if msg_id in msg_ids:
msgs[-1] = msg
else:
msg_ids.add(msg_id)
msgs.append(msg)
self.logger.info(f"去重后:共 {len(msgs)} 条消息")
return msgs
else:
return messages

# === 同步公共接口 ===
def get_channels(self) -> List[Dict]:
"""同步获取频道列表"""
return self._run_async(self.get_channels_async())

def get_topics(self, stream_id: int) -> List[Dict]:
"""同步获取话题列表"""
return self._run_async(self.get_topics_async(stream_id))

def get_topic_history(self, stream_id: int, topic_name: str,
batch_size: int = 100, latest:bool=True) -> List[Dict]:
"""同步获取话题完整历史"""
return self._run_async(
self.get_topic_history_async(stream_id, topic_name, batch_size, latest)
)

使用示例

基础用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)

# 初始化客户端
client = ZulipClient(
site_url="https://your-zulip-instance.zulipchat.com",
bot_email="your-bot@yourdomain.com",
bot_api_key="your-api-key"
)

# 获取频道列表
channels = client.get_channels()
print(f"找到 {len(channels)} 个频道")

for channel in channels[:5]: # 显示前5个频道
print(f"- {channel['name']} (ID: {channel['stream_id']})")

异步用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

async def main():
async with ZulipClient(site_url, bot_email, bot_api_key) as client:
# 获取频道
channels = await client.get_channels_async()

if channels:
stream_id = channels[0]['stream_id']

# 获取话题
topics = await client.get_topics_async(stream_id)
print(f"频道 {channels[0]['name']}{len(topics)} 个话题")

if topics:
# 获取第一个话题的历史消息
topic_name = topics[0]['name']
messages = await client.get_topic_history_async(stream_id, topic_name)

print(f"话题 '{topic_name}' 有 {len(messages)} 条消息")

# 显示最新的几条消息
for msg in messages[-3:]:
print(f"[{msg['sender_full_name']}]: {msg['content'][:100]}...")

# 运行异步代码
asyncio.run(main())

通过 Zulip API,我们可以轻松地获取和处理 Zulip 中的聊天数据。这为构建 AI 辅助阅读、自动摘要、智能问答等应用奠定了基础。记住,技术的价值在于如何负责任地使用它来改善我们的工作和学习体验。